PySpark Handledning för nybörjare: Lär dig med EXEMPEL
Innan du lär dig PySpark, låt oss förstå:
Vad är Apache Spark?
Spark är en big data-lösning som har visat sig vara enklare och snabbare än Hadoop MapReduce. Spark är en programvara med öppen källkod utvecklad av UC Berkeley RAD lab 2009. Sedan den släpptes för allmänheten 2010, Spark har vuxit i popularitet och används i branschen med en aldrig tidigare skådad omfattning.
I era av Stora data, behöver utövare mer än någonsin snabba och pålitliga verktyg för att bearbeta streaming av data. Tidigare verktyg som MapReduce var favoriter men var långsamma. För att övervinna detta problem, Spark erbjuder en lösning som är både snabb och allmänt ändamålsenlig. Den största skillnaden mellan Spark och MapReduce är det Spark kör beräkningar i minnet under senare på hårddisken. Det möjliggör höghastighetsåtkomst och databehandling, vilket minskar tiderna från timmar till minuter.
Vad är PySpark?
PySpark är ett verktyg skapat av Apache Spark Community för användning Python med Spark. Det gör det möjligt att arbeta med RDD (Resilient Distributed Dataset) i Python. Den erbjuder också PySpark Skal för att länka Python API:er med Spark kärna att initiera Spark Sammanhang. Spark är namnmotorn för att realisera klusterberäkning, medan PySpark is Pythons bibliotek att använda Spark.
Hur fungerar det? Spark arbete?
Spark är baserad på beräkningsmotor, vilket innebär att den tar hand om schemaläggning, distribution och övervakning av applikationen. Varje uppgift utförs på olika arbetsmaskiner som kallas datorkluster. Ett datorkluster avser fördelningen av uppgifter. En maskin utför en uppgift, medan de andra bidrar till den slutliga produktionen genom en annan uppgift. I slutändan är alla uppgifter aggregerade för att producera en output. De Spark admin ger en 360 översikt över olika Spark Jobb.

Spark är utformad för att arbeta med
- Python
- Java
- Skala
- SQL
Ett betydande inslag i Spark är den stora mängden inbyggda bibliotek, inklusive MLlib för maskininlärning. Spark är också designad för att fungera med Hadoop-kluster och kan läsa den breda typen av filer, inklusive Hive-data, CSV, JSON, Casandra-data bland annat.
Varför användning Spark?
Som framtida datautövare bör du vara bekant med pythons berömda bibliotek: Pandas och scikit-learn. Dessa två bibliotek är fantastiska för att utforska dataset upp till medelstora. Regelbundna maskininlärningsprojekt är uppbyggda kring följande metodik:
- Ladda data till disken
- Importera data till maskinens minne
- Bearbeta/analysera data
- Bygg maskininlärningsmodellen
- Lagra förutsägelsen tillbaka på disken
Problemet uppstår om dataforskaren vill bearbeta data som är för stora för en dator. Under tidigare dagar av datavetenskap provade utövarna eftersom utbildning i enorma datamängder inte alltid behövdes. Dataforskaren skulle hitta ett bra statistiskt urval, utföra en extra robusthetskontroll och komma med en utmärkt modell.
Det finns dock några problem med detta:
- Speglar datasetet den verkliga världen?
- Innehåller uppgifterna ett specifikt exempel?
- Är modellen lämplig för provtagning?
Ta användarrekommendationer till exempel. Rekommenderar förlitar sig på att jämföra användare med andra användare när de utvärderar deras preferenser. Om datautövaren bara tar en delmängd av datan kommer det inte att finnas en kohort av användare som är väldigt lika varandra. Rekommendationer måste köras på hela datasetet eller inte alls.
Vad är lösningen?
Lösningen har varit uppenbar under lång tid, dela upp problemet på flera datorer. Parallell beräkning kommer också med flera problem. Utvecklare har ofta problem med att skriva parallell kod och i slutändan måste de lösa en massa av de komplexa frågorna kring själva multibearbetningen.
Pyspark ger dataforskaren ett API som kan användas för att lösa problemen med parallella dataförlopp. Pyspark hanterar komplexiteten i multiprocessing, som att distribuera data, distribuera kod och samla in utdata från arbetarna på ett kluster av maskiner.
Spark kan köras fristående men körs oftast ovanpå ett klusterberäkningsramverk som Hadoop. I test och utveckling kan dock en dataforskare köra effektivt Spark på sina utvecklingsboxar eller bärbara datorer utan ett kluster
• En av de främsta fördelarna med Spark är att bygga en arkitektur som omfattar dataströmningshantering, sömlösa datafrågor, maskininlärningsförutsägelse och realtidsåtkomst till olika analyser.
• Spark arbetar nära med SQL-språk, dvs strukturerad data. Det gör det möjligt att söka efter data i realtid.
• Data scientists huvuduppgift är att analysera och bygga prediktiva modeller. Kort sagt, en dataforskare behöver veta hur man frågar efter data med hjälp av SQL, producera en statistisk rapport och använda maskininlärning för att producera förutsägelser. Dataforskare lägger en betydande del av sin tid på att rengöra, transformera och analysera data. När datasetet eller dataarbetsflödet är klart använder dataforskaren olika tekniker för att upptäcka insikter och dolda mönster. Datamanipulationen bör vara robust och lika lätt att använda. Spark är det rätta verktyget tack vare dess snabbhet och rika API:er.
I denna PySpark tutorial, kommer du att lära dig hur du bygger en klassificerare med PySpark exempel.
Hur man installerar PySpark med AWS
Ocuco-landskapet Jupyter team bygger en Docker-bild att köra Spark effektivt. Nedan är stegen du kan följa för att installera PySpark instans i AWS.
Se vår handledning på AWS och TensorFlow
Steg 1: Skapa en instans
Först och främst måste du skapa en instans. Gå till ditt AWS-konto och starta instansen. Du kan öka lagringsutrymmet upp till 15g och använda samma säkerhetsgrupp som i TensorFlow-handledningen.
Steg 2: Öppna anslutningen
Öppna anslutningen och installera docker-behållaren. För mer information, se handledningen med TensorFlow med Hamnarbetare. Observera att du måste vara i rätt arbetskatalog.
Kör helt enkelt dessa koder för att installera Docker:
sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit
Steg 3: Öppna anslutningen igen och installera Spark
När du har öppnat anslutningen igen kan du installera bilden som innehåller PySpark.
## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree
Steg 4: Öppna Jupyter
Kontrollera behållaren och dess namn
docker ps
Starta docker med docker logs följt av namnet på docker. Docker loggar till exempel zealous_goldwasser
Gå till din webbläsare och starta Jupyter. Adressen är http://localhost:8888/. Klistra in lösenordet från terminalen.
Anmärkningar: om du vill ladda upp/ladda ner en fil till din AWS-maskin kan du använda programvaran Cyberduck, https://cyberduck.io/.
Hur man installerar PySpark on Windows/Mac med Conda
Följande är en detaljerad process om hur man installerar PySpark on Windows/Mac med Anaconda:
Så här installerar du Spark på din lokala dator är en rekommenderad praxis att skapa en ny conda-miljö. Denna nya miljö kommer att installeras Python 3.6 Spark och alla beroenden.
Mac-användare
cd anaconda3 touch hello-spark.yml vi hello-spark.yml
Windows Användare
cd C:\Users\Admin\Anaconda3 echo.>hello-spark.yml notepad hello-spark.yml
Du kan redigera .yml-filen. Var försiktig med indraget. Två platser krävs innan –
name: hello-spark
dependencies:
- python=3.6
- jupyter
- ipython
- numpy
- numpy-base
- pandas
- py4j
- pyspark
- pytz
Spara den och skapa miljön. Det tar lite tid
conda env create -f hello-spark.yml
För mer information om platsen, se handledningen Installera TensorFlow
Du kan kontrollera all miljö som är installerad i din maskin
conda env list
Activate hello-spark
Mac-användare
source activate hello-spark
Windows Användare
activate hello-spark
Obs: Du har redan skapat en specifik TensorFlow-miljö för att köra handledningarna på TensorFlow. Det är bekvämare att skapa en ny miljö som skiljer sig från hello-tf. Det är ingen mening att överbelasta hello-tf med Spark eller andra maskininlärningsbibliotek.
Föreställ dig att det mesta av ditt projekt involverar TensorFlow, men du måste använda Spark för ett särskilt projekt. Du kan ställa in en TensorFlow-miljö för alla dina projekt och skapa en separat miljö för Spark. Du kan lägga till så många bibliotek i Spark miljö som du vill utan att störa TensorFlow-miljön. När du är klar med Sparks projekt kan du radera det utan att påverka TensorFlow-miljön.
Jupyter
Öppet Jupyter Notebook och försök om PySpark fabrik. I en ny anteckningsbok klistra in följande PySpark Exempelkod:
import pyspark from pyspark import SparkContext sc =SparkContext()
Om ett fel visas är det troligt att Java är inte installerat på din maskin. I mac, öppna terminalen och skriv java -version, om det finns en java-version, se till att den är 1.8. I Windows, gå till Application och kontrollera om det finns en Java mapp. Om det finns en Java mapp, kolla det Java 1.8 är installerad. När detta skrivs, PySpark är inte kompatibel med Java9 och över.
Om du behöver installera Java, du att tänka länk och ladda ner jdk-8u181-windows-x64.exe
För Mac-användare rekommenderas att använda `brew.`
brew tap caskroom/versions brew cask install java8
Se denna steg för steg handledning på hur man installerar Java
Anmärkningar: Använd remove för att radera en miljö helt.
conda env remove -n hello-spark -y
Spark Sammanhang
SparkKontext är den interna motorn som tillåter anslutningarna med klustren. Om du vill genomföra en operation behöver du en SparkSammanhang.
Skapa en SparkSammanhang
Först och främst måste du initiera en SparkSammanhang.
import pyspark from pyspark import SparkContext sc =SparkContext()
Nu när den SparkKontexten är klar, du kan skapa en samling data som kallas RDD, Resilient Distributed Dataset. Beräkningar i en RDD parallelliseras automatiskt över klustret.
nums= sc.parallelize([1,2,3,4])
Du kan komma åt den första raden med take
nums.take(1)
[1]
Du kan tillämpa en transformation på data med en lambdafunktion. I PySpark exemplet nedan returnerar du kvadraten av siffror. Det är en kartomvandling
squared = nums.map(lambda x: x*x).collect()
for num in squared:
print('%i ' % (num))
1 4 9 16
SQLContext
Ett bekvämare sätt är att använda DataFrame. SparkKontexten är redan inställd, du kan använda den för att skapa dataramen. Du måste också deklarera SQLContext
SQLContext gör det möjligt att ansluta motorn med olika datakällor. Den används för att initiera funktionerna i Spark sql.
from pyspark.sql import Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
Nu i detta Spark handledning Python, låt oss skapa en lista med tupel. Varje tupel kommer att innehålla namnet på personerna och deras ålder. Fyra steg krävs:
Steg 1) Skapa listan över tuple med informationen
[('John',19),('Smith',29),('Adam',35),('Henry',50)]
Steg 2) Bygg en RDD
rdd = sc.parallelize(list_p)
Steg 3) Konvertera tuplarna
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
Steg 4) Skapa en DataFrame-kontext
sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)
Om du vill komma åt typen av varje funktion kan du använda printSchema()
DF_ppl.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
Exempel på maskininlärning med PySpark
Nu när du har en kort uppfattning om Spark och SQLContext är du redo att bygga ditt första maskininlärningsprogram.
Följande är stegen för att bygga ett maskininlärningsprogram med PySpark:
- Steg 1) Grundläggande drift med PySpark
- Steg 2) Förbehandling av data
- Steg 3) Bygg en pipeline för databehandling
- Steg 4) Bygg klassificeraren: logistik
- Steg 5) Träna och utvärdera modellen
- Steg 6) Justera hyperparametern
I denna PySpark Machine Learning tutorial, vi kommer att använda vuxendatauppsättningen. Syftet med denna handledning är att lära sig hur man använder Pyspark. För mer information om datasetet, se denna handledning.
Observera att datasetet inte är signifikant och du kanske tror att beräkningen tar lång tid. Spark är utformad för att behandla en betydande mängd data. Sparks prestanda ökar i förhållande till andra maskininlärningsbibliotek när den bearbetade datamängden växer sig större.
Steg 1) Grundläggande drift med PySpark
Först och främst måste du initiera SQLContext är inte redan initierad än.
#from pyspark.sql import SQLContext url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" from pyspark import SparkFiles sc.addFile(url) sqlContext = SQLContext(sc)
sedan kan du läsa cvs-filen med sqlContext.read.csv. Du använder inferSchema satt till True för att berätta Spark för att automatiskt gissa typen av data. Som standard är den vänd till False.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Låt oss ta en titt på datatypen
df.printSchema() root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education_num: integer (nullable = true) |-- marital: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: integer (nullable = true) |-- capital_loss: integer (nullable = true) |-- hours_week: integer (nullable = true) |-- native_country: string (nullable = true) |-- label: string (nullable = true)
Du kan se data med show.
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |age|workclass |fnlwgt|education|education_num|marital |occupation |relationship |race |sex |capital_gain|capital_loss|hours_week|native_country|label| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ |39 |State-gov |77516 |Bachelors|13 |Never-married |Adm-clerical |Not-in-family|White|Male |2174 |0 |40 |United-States |<=50K| |50 |Self-emp-not-inc|83311 |Bachelors|13 |Married-civ-spouse|Exec-managerial |Husband |White|Male |0 |0 |13 |United-States |<=50K| |38 |Private |215646|HS-grad |9 |Divorced |Handlers-cleaners|Not-in-family|White|Male |0 |0 |40 |United-States |<=50K| |53 |Private |234721|11th |7 |Married-civ-spouse|Handlers-cleaners|Husband |Black|Male |0 |0 |40 |United-States |<=50K| |28 |Private |338409|Bachelors|13 |Married-civ-spouse|Prof-specialty |Wife |Black|Female|0 |0 |40 |Cuba |<=50K| +---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+ only showing top 5 rows
Om du inte satte inderShema till True, här är vad som händer med typen. Det finns alla i sträng.
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= False)
df_string.printSchema()
root
|-- age: string (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: string (nullable = true)
|-- education: string (nullable = true)
|-- education_num: string (nullable = true)
|-- marital: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital_gain: string (nullable = true)
|-- capital_loss: string (nullable = true)
|-- hours_week: string (nullable = true)
|-- native_country: string (nullable = true)
|-- label: string (nullable = true)
För att konvertera den kontinuerliga variabeln till rätt format kan du använda omarbeta kolumnerna. Du kan använda withColumn för att berätta Spark vilken kolumn som ska utföra omvandlingen.
# Import all from `sql.types`
from pyspark.sql.types import *
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
for name in names:
df = df.withColumn(name, df[name].cast(newType))
return df
# List of continuous features
CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
|-- age: float (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: float (nullable = true)
|-- education: string (nullable = true)
|-- education_num: float (nullable = true)
|-- marital: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital_gain: float (nullable = true)
|-- capital_loss: float (nullable = true)
|-- hours_week: float (nullable = true)
|-- native_country: string (nullable = true)
|-- label: string (nullable = true)
from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()
Välj kolumner
Du kan välja och visa raderna med select och namnen på funktionerna. Nedan är ålder och fnlwgt valda.
df.select('age','fnlwgt').show(5)
+---+------+ |age|fnlwgt| +---+------+ | 39| 77516| | 50| 83311| | 38|215646| | 53|234721| | 28|338409| +---+------+ only showing top 5 rows
Räkna efter grupp
Om du vill räkna antalet förekomster per grupp kan du kedja:
- Grupp av()
- räkna()
tillsammans. I PySpark exemplet nedan räknar du antalet rader efter utbildningsnivå.
df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+ | education|count| +------------+-----+ | Preschool| 51| | 1st-4th| 168| | 5th-6th| 333| | Doctorate| 413| | 12th| 433| | 9th| 514| | Prof-school| 576| | 7th-8th| 646| | 10th| 933| | Assoc-acdm| 1067| | 11th| 1175| | Assoc-voc| 1382| | Masters| 1723| | Bachelors| 5355| |Some-college| 7291| | HS-grad|10501| +------------+-----+
Beskriv data
För att få en sammanfattande statistik över datan kan du använda describe(). Den kommer att beräkna:
- räkna
- betyda
- standardavvikelse
- min
- max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ |summary| age| workclass| fnlwgt| education| education_num| marital| occupation|relationship| race| sex| capital_gain| capital_loss| hours_week|native_country|label| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+ | count| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561| 32561|32561| | mean| 38.58164675532078| null|189778.36651208502| null| 10.0806793403151| null| null| null| null| null|1077.6488437087312| 87.303829734959|40.437455852092995| null| null| | stddev|13.640432553581356| null|105549.97769702227| null|2.572720332067397| null| null| null| null| null| 7385.292084840354|402.960218649002|12.347428681731838| null| null| | min| 17| ?| 12285| 10th| 1|Divorced| ?| Husband|Amer-Indian-Eskimo|Female| 0| 0| 1| ?|<=50K| | max| 90|Without-pay| 1484705|Some-college| 16| Widowed|Transport-moving| Wife| White| Male| 99999| 4356| 99| Yugoslavia| >50K| +-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
Om du bara vill ha sammanfattande statistik för en kolumn, lägg till namnet på kolumnen inuti describe()
df.describe('capital_gain').show()
+-------+------------------+ |summary| capital_gain| +-------+------------------+ | count| 32561| | mean|1077.6488437087312| | stddev| 7385.292084840354| | min| 0| | max| 99999| +-------+------------------+
Korstabellberäkning
Vid vissa tillfällen kan det vara intressant att se den beskrivande statistiken mellan två parvisa kolumner. Till exempel kan du räkna antalet personer med inkomst under eller över 50k efter utbildningsnivå. Denna operation kallas en korstabell.
df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+ |age_label|<=50K|>50K| +---------+-----+----+ | 17| 395| 0| | 18| 550| 0| | 19| 710| 2| | 20| 753| 0| | 21| 717| 3| | 22| 752| 13| | 23| 865| 12| | 24| 767| 31| | 25| 788| 53| | 26| 722| 63| | 27| 754| 81| | 28| 748| 119| | 29| 679| 134| | 30| 690| 171| | 31| 705| 183| | 32| 639| 189| | 33| 684| 191| | 34| 643| 243| | 35| 659| 217| | 36| 635| 263| +---------+-----+----+ only showing top 20 rows
Du kan se att inga människor har intäkter över 50 XNUMX när de är unga.
Släpp kolumn
Det finns två intuitiva API för att släppa kolumner:
- drop(): Släpp en kolumn
- dropna(): Släpp NA:s
Nedan släpper du kolumnen utbildning_num
df.drop('education_num').columns
['age',
'workclass',
'fnlwgt',
'education',
'marital',
'occupation',
'relationship',
'race',
'sex',
'capital_gain',
'capital_loss',
'hours_week',
'native_country',
'label']
Filtrera data
Du kan använda filter() för att tillämpa beskrivande statistik i en delmängd av data. Till exempel kan du räkna antalet personer över 40 år
df.filter(df.age > 40).count()
13443
Descriptive statistik per grupp
Slutligen kan du gruppera data efter grupp och beräkna statistiska operationer som medelvärdet.
df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+ | marital| avg(capital_gain)| +--------------------+------------------+ | Separated| 535.5687804878049| | Never-married|376.58831788823363| |Married-spouse-ab...| 653.9832535885167| | Divorced| 728.4148098131893| | Widowed| 571.0715005035247| | Married-AF-spouse| 432.6521739130435| | Married-civ-spouse|1764.8595085470085| +--------------------+------------------+
Steg 2) Dataförbehandling
Databehandling är ett kritiskt steg i maskininlärning. När du har tagit bort skräpdata får du några viktiga insikter.
Till exempel vet du att ålder inte är en linjär funktion med inkomsten. När människor är unga är deras inkomst vanligtvis lägre än medelåldern. Efter pensioneringen använder ett hushåll sitt sparande, vilket innebär en minskad inkomst. För att fånga det här mönstret kan du lägga till en kvadrat till åldersfunktionen
Lägg till åldersruta
För att lägga till en ny funktion måste du:
- Välj kolumnen
- Tillämpa transformationen och lägg till den i DataFrame
from pyspark.sql.functions import *
# 1 Select the column
age_square = df.select(col("age")**2)
# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)
df.printSchema()
root
|-- age: integer (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: integer (nullable = true)
|-- education: string (nullable = true)
|-- education_num: integer (nullable = true)
|-- marital: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital_gain: integer (nullable = true)
|-- capital_loss: integer (nullable = true)
|-- hours_week: integer (nullable = true)
|-- native_country: string (nullable = true)
|-- label: string (nullable = true)
|-- age_square: double (nullable = true)
Du kan se att age_square har lagts till i dataramen. Du kan ändra ordningen på variablerna med select. Nedan tar du med age_square direkt efter ålder.
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')
Uteslut Holand-Nederländerna
När en grupp inom en funktion bara har en observation, ger den ingen information till modellen. Tvärtom kan det leda till ett fel under korsvalideringen.
Låt oss kontrollera hushållets ursprung
df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+ | native_country|count(native_country)| +--------------------+---------------------+ | Holand-Netherlands| 1| | Scotland| 12| | Hungary| 13| | Honduras| 13| |Outlying-US(Guam-...| 14| | Yugoslavia| 16| | Thailand| 18| | Laos| 18| | Cambodia| 19| | Trinadad&Tobago| 19| | Hong| 20| | Ireland| 24| | Ecuador| 28| | Greece| 29| | France| 29| | Peru| 31| | Nicaragua| 34| | Portugal| 37| | Iran| 43| | Haiti| 44| +--------------------+---------------------+ only showing top 20 rows
Funktionen native_country har bara ett hushåll som kommer från Nederland. Du utesluter det.
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
Steg 3) Bygg en pipeline för databehandling
I likhet med scikit-learn har Pyspark ett pipeline-API.
En pipeline är mycket bekväm för att upprätthålla strukturen för data. Du skjuter in data i pipelinen. Inne i pipelinen görs olika operationer, utgången används för att mata algoritmen.
En universell transformation inom maskininlärning består till exempel av att konvertera en sträng till en het kodare, dvs en kolumn av en grupp. En varmkodare är vanligtvis en matris full av nollor.
Stegen för att transformera data liknar mycket scikit-learn. Du behöver:
- Indexera strängen till numerisk
- Skapa den enda heta kodaren
- Förvandla data
Två API:er gör jobbet: StringIndexer, OneHotEncoder
- Först och främst väljer du strängkolumnen som ska indexeras. InputCol är namnet på kolumnen i datamängden. outputCol är det nya namnet till den transformerade kolumnen.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
- Anpassa data och transformera den
model = stringIndexer.fit(df) `indexed = model.transform(df)``
- Skapa nyhetskolumner baserat på gruppen. Till exempel, om det finns 10 grupper i funktionen kommer den nya matrisen att ha 10 kolumner, en för varje grupp.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") encoded = encoder.transform(indexed) encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ |age|age_square| workclass|fnlwgt|education|education_num| marital| occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ | 39| 1521.0| State-gov| 77516|Bachelors| 13| Never-married| Adm-clerical|Not-in-family|White|Male| 2174| 0| 40| United-States|<=50K| 4.0|(9,[4],[1.0])| | 50| 2500.0|Self-emp-not-inc| 83311|Bachelors| 13|Married-civ-spouse|Exec-managerial| Husband|White|Male| 0| 0| 13| United-States|<=50K| 1.0|(9,[1],[1.0])| +---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+ only showing top 2 rows
Bygg pipeline
Du kommer att bygga en pipeline för att konvertera alla exakta funktioner och lägga till dem i den slutliga datamängden. Pipelinen kommer att ha fyra operationer, men lägg gärna till så många operationer du vill.
- Koda de kategoriska uppgifterna
- Indexera etikettfunktionen
- Lägg till kontinuerlig variabel
- Montera stegen.
Varje steg lagras i en lista med namn med steg. Denna lista kommer att tala om för VectorAssembler vilken operation som ska utföras inuti pipelinen.
1. Koda kategoridata
Detta steg är exakt detsamma som exemplet ovan, förutom att du loopar över alla kategoriska funktioner.
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
2. Indexera etikettfunktionen
Spark, liksom många andra bibliotek, accepterar inte strängvärden för etiketten. Du konverterar etikettfunktionen med StringIndexer och lägger till den i liststadierna
# Convert label into label indices using the StringIndexer label_stringIdx = StringIndexer(inputCol="label", outputCol="newlabel") stages += [label_stringIdx]
3. Lägg till kontinuerlig variabel
VectorAssemblerens inputCols är en lista med kolumner. Du kan skapa en ny lista som innehåller alla nya kolumner. Koden nedan fyller listan med kodade kategoriska funktioner och de kontinuerliga funktionerna.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
4. Montera stegen.
Slutligen klarar du alla steg i VectorAssembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
Nu när alla steg är klara skickar du data till pipelinen.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove)
Om du kontrollerar den nya datamängden kan du se att den innehåller alla funktioner, transformerade och inte transformerade. Du är bara intresserad av den nya etiketten och funktionerna. Funktionerna inkluderar alla transformerade funktioner och de kontinuerliga variablerna.
model.take(1)
[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]
Steg 4) Bygg klassificeraren: logistik
För att göra beräkningen snabbare konverterar du modellen till en DataFrame.
Du måste välja ny etikett och funktioner från modellen med hjälp av kartan.
from pyspark.ml.linalg import DenseVector input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
Du är redo att skapa tågdata som en DataFrame. Du använder sqlContext
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
Kontrollera den andra raden
df_train.show(2)
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|[0.0,0.0,0.0,0.0,...| | 0.0|[0.0,1.0,0.0,0.0,...| +-----+--------------------+ only showing top 2 rows
Skapa ett tåg/testset
Du delar upp datasetet 80/20 med randomSplit.
# Split the data into train and test sets train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
Låt oss räkna hur många personer med inkomst under/över 50k i både träning och testset
train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 19698| | 1.0| 6263| +-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
Bygg den logistiska regressorn
Sist men inte minst kan du bygga klassificeraren. Pyspark har ett API som heter LogisticRegression för att utföra logistisk regression.
Du initierar lr genom att ange etikettkolumnen och funktionskolumner. Du ställer in maximalt 10 iterationer och lägger till en regulariseringsparameter med ett värde på 0.3. Observera att i nästa avsnitt kommer du att använda korsvalidering med ett parameterrutnät för att ställa in modellen
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression
# Initialize `lr`
lr = LogisticRegression(labelCol="label",
featuresCol="features",
maxIter=10,
regParam=0.3)
# Fit the data to the model
linearModel = lr.fit(train_data)
#Du kan se koefficienterna från regressionen
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698] Intercept: -1.9884177974805692
Steg 5) Träna och utvärdera modellen
För att generera förutsägelse för din testuppsättning,
Du kan använda linearModel med transform() på test_data
# Make predictions on test data using the transform() method. predictions = linearModel.transform(test_data)
Du kan skriva ut elementen i förutsägelser
predictions.printSchema() root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
Du är intresserad av etiketten, förutsägelsen och sannolikheten
selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+ |label|prediction| probability| +-----+----------+--------------------+ | 0.0| 0.0|[0.91560704124179...| | 0.0| 0.0|[0.92812140213994...| | 0.0| 0.0|[0.92161406774159...| | 0.0| 0.0|[0.96222760777142...| | 0.0| 0.0|[0.66363283056957...| | 0.0| 0.0|[0.65571324475477...| | 0.0| 0.0|[0.73053376932829...| | 0.0| 1.0|[0.31265053873570...| | 0.0| 0.0|[0.80005907577390...| | 0.0| 0.0|[0.76482251301640...| | 0.0| 0.0|[0.84447301189069...| | 0.0| 0.0|[0.75691912026619...| | 0.0| 0.0|[0.60902504096722...| | 0.0| 0.0|[0.80799228385509...| | 0.0| 0.0|[0.87704364852567...| | 0.0| 0.0|[0.83817652582377...| | 0.0| 0.0|[0.79655423248500...| | 0.0| 0.0|[0.82712311232246...| | 0.0| 0.0|[0.81372823882016...| | 0.0| 0.0|[0.59687710752201...| +-----+----------+--------------------+ only showing top 20 rows
Utvärdera modellen
Du måste titta på noggrannhetsmåttet för att se hur bra (eller dåligt) modellen presterar. För närvarande finns det inget API att beräkna noggrannhetsmåttet i Spark. Standardvärdet är ROC, mottagarens funktionskarakteristikkurva. Det är ett annat mått som tar hänsyn till den falska positiva frekvensen.
Innan du tittar på ROC, låt oss konstruera noggrannhetsmåttet. Du är mer bekant med detta mått. Noggrannhetsmåttet är summan av den korrekta förutsägelsen över det totala antalet observationer.
Du skapar en DataFrame med etiketten och `prediction.
cm = predictions.select("label", "prediction")
Du kan kontrollera antalet klasser i etiketten och förutsägelsen
cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+ |label|count(label)| +-----+------------+ | 0.0| 5021| | 1.0| 1578| +-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+ |prediction|count(prediction)| +----------+-----------------+ | 0.0| 5982| | 1.0| 617| +----------+-----------------+
Till exempel, i testsetet finns det 1578 hushåll med en inkomst över 50k och 5021 under. Klassificeraren förutspådde dock 617 hushåll med inkomster över 50k.
Du kan beräkna noggrannheten genom att beräkna antalet när etiketten är korrekt klassificerad över det totala antalet rader.
cm.filter(cm.label == cm.prediction).count() / cm.count()
0.8237611759357478
Du kan slå ihop allt och skriva en funktion för att beräkna noggrannheten.
def accuracy_m(model):
predictions = model.transform(test_data)
cm = predictions.select("label", "prediction")
acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
print("Model accuracy: %.3f%%" % (acc * 100))
accuracy_m(model = linearModel)
Model accuracy: 82.376%
ROC-mått
Modulen BinaryClassificationEvaluator innehåller ROC-måtten. Mottagaren Operating Karakteristisk kurva är ett annat vanligt verktyg som används med binär klassificering. Den är väldigt lik precision/återkallelsekurvan, men istället för att plotta precision mot återkallelse visar ROC-kurvan den sanna positiva frekvensen (dvs. återkallelse) mot den falska positiva frekvensen. Den falska positiva frekvensen är förhållandet mellan negativa instanser som felaktigt klassificeras som positiva. Det är lika med ett minus den verkliga negativa kursen. Den sanna negativa frekvensen kallas också specificitet. Därför plottar ROC-kurvan sensitivitet (återkallelse) mot 1 – specificitet
### Use ROC from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluate model evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") print(evaluator.evaluate(predictions)) print(evaluator.getMetricName())
0.8940481662695192areaUnderROC
print(evaluator.evaluate(predictions))
0.8940481662695192
Steg 6) Ställ in hyperparametern
Sist men inte minst kan du ställa in hyperparametrarna. Liknande scikit lära sig du skapar ett parameterrutnät och lägger till de parametrar du vill justera.
För att minska tiden för beräkningen ställer du bara in regulariseringsparametern med endast två värden.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.01, 0.5])
.build())
Slutligen utvärderar du modellen med att använda korsvalieringsmetoden med 5 veck. Det tar cirka 16 minuter att träna.
from time import *
start_time = time()
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=evaluator, numFolds=5)
# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)
Tid att träna modell: 978.807 sekunder
Den bästa regulariseringshyperparametern är 0.01, med en noggrannhet på 85.316 procent.
accuracy_m(model = cvModel) Model accuracy: 85.316%
Du kan extrahera den rekommenderade parametern genom att kedja cvModel.bestModel med extractParamMap()
bestModel = cvModel.bestModel bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}
Sammanfattning
Spark är ett grundläggande verktyg för en dataforskare. Det låter utövaren koppla en app till olika datakällor, utföra dataanalys sömlöst eller lägga till en prediktiv modell.
Till att börja med Spark, måste du initiera en Spark Sammanhang med:
'SparkSammanhang()'
och och SQL sammanhang för att ansluta till en datakälla:
'SQLContext()'
I handledningen lär du dig hur du tränar en logistisk regression:
- Konvertera datamängden till en dataram med:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) sqlContext.createDataFrame(input_data, ["label", "features"])
Observera att etikettens kolumnnamn är newlabel och alla funktioner är samlade i funktioner. Ändra dessa värden om de är olika i din datauppsättning.
- Skapa tåget/testsetet
randomSplit([.8,.2],seed=1234)
- Träna modellen
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
- Gör förutsägelser
linearModel.transform()

