PySpark Посібник для початківців: навчайтеся на ПРИКЛАДАХ

Перш ніж вивчати піSpark, давайте розбиратися:

Що таке Apache Spark?

Spark це рішення для великих даних, яке було доведено легшим і швидшим, ніж Hadoop MapReduce. Spark це програмне забезпечення з відкритим вихідним кодом, розроблене лабораторією UC Berkeley RAD у 2009 році. Оскільки воно було випущено для громадськості у 2010 році, Spark зросла популярність і використовується в галузі з безпрецедентним масштабом.

В епоху о Великий данихпрактикуючим спеціалістам як ніколи потрібні швидкі та надійні інструменти для обробки потокових даних. Попередні інструменти, такі як MapReduce, були улюбленими, але працювали повільно. Щоб подолати цю проблему, Spark пропонує швидке та універсальне рішення. Основна відмінність між Spark і це MapReduce Spark виконує обчислення в пам'яті на жорсткому диску. Це забезпечує високу швидкість доступу та обробки даних, скорочуючи час від годин до хвилин.

Що таке PySpark?

PySpark це інструмент, створений Apache Spark Спільнота для використання Python з Spark. Це дозволяє працювати з RDD (Resilient Distributed Dataset) в Python. Він також пропонує PySpark Оболонка для посилання Python API з Spark ядро ініціювати Spark Контекст. Spark це назва двигуна для реалізації кластерних обчислень, тоді як PySpark is PythonБібліотека для використання Spark.

Як Spark працювати?

Spark заснований на обчислювальному механізмі, тобто він піклується про програму планування, розподілу та моніторингу. Кожне завдання виконується на різних робочих машинах, які називаються обчислювальним кластером. Обчислювальний кластер відноситься до розподілу завдань. Одна машина виконує одне завдання, тоді як інші роблять свій внесок у кінцевий результат через інше завдання. Зрештою, усі завдання об’єднуються для отримання результату. The Spark адміністратор надає 360 огляд різноманітних Spark Вакансії.

Як Spark Робота
Як Spark Робота

Spark призначений для роботи з

  • Python
  • Java
  • масштаб
  • SQL

Суттєвою особливістю в Spark це велика кількість вбудованої бібліотеки, включаючи MLlib для машинного навчання. Spark також розроблений для роботи з кластерами Hadoop і може читати широкий тип файлів, включаючи дані Hive, CSV, JSON, дані Casandra та інші.

Навіщо використовувати Spark?

Як майбутній фахівець із обробки даних, ви повинні бути знайомі з відомими бібліотеками Python: Pandas і scikit-learn. Ці дві бібліотеки чудово підходять для вивчення набору даних середнього розміру. Звичайні проекти машинного навчання будуються навколо такої методології:

  • Завантажте дані на диск
  • Імпортуйте дані в пам'ять машини
  • Обробити/проаналізувати дані
  • Побудуйте модель машинного навчання
  • Збережіть прогноз назад на диск

Проблема виникає, якщо фахівець із обробки даних хоче обробити дані, які занадто великі для одного комп’ютера. У перші дні науки про дані практики брали вибірку, оскільки навчання величезним наборам даних не завжди було потрібним. Спеціаліст з обробки даних знайде хорошу статистичну вибірку, виконає додаткову перевірку надійності та створить чудову модель.

Однак з цим є деякі проблеми:

  • Чи відображає набір даних реальний світ?
  • Чи містять дані конкретний приклад?
  • Чи підходить модель для вибірки?

Візьміть, наприклад, рекомендації користувачів. Рекомендатори покладаються на порівняння користувачів з іншими користувачами в оцінці їхніх уподобань. Якщо фахівець із обробки даних бере лише частину даних, не буде когорти користувачів, які дуже схожі один на одного. Рекоменданти повинні працювати на повному наборі даних або взагалі не працювати.

Що таке рішення?

Рішення було очевидним протягом тривалого часу, розділіть проблему на кілька комп’ютерів. Паралельні обчислення також пов’язані з кількома проблемами. Розробники часто мають проблеми з написанням паралельного коду, і їм доводиться вирішувати купу складних проблем, пов’язаних із самою багатопроцесорністю.

Pyspark надає досліднику даних API, який можна використовувати для вирішення проблем паралельного передачі даних. Pyspark справляється зі складністю багатопроцесорної обробки, як-от розповсюдження даних, розповсюдження коду та збір вихідних даних робочих кластерів машин.

Spark може працювати автономно, але найчастіше працює на основі кластерної обчислювальної системи, такої як Hadoop. Проте в тестуванні та розробці науковець з даних може ефективно працювати Spark на своїх розробниках або ноутбуках без кластера

• Одна з головних переваг Spark побудувати архітектуру, яка охоплює керування потоками даних, плавні запити даних, прогнозування машинного навчання та доступ у реальному часі до різноманітних аналізів.

• Spark тісно працює з мовою SQL, тобто структурованими даними. Це дозволяє запитувати дані в режимі реального часу.

• Основна робота спеціаліста з обробки даних полягає в аналізі та створенні прогнозних моделей. Коротше кажучи, фахівець з даних повинен знати, як запитувати дані за допомогою SQL, створити статистичний звіт і використовувати машинне навчання для створення прогнозів. Фахівці з даних витрачають значну частину свого часу на очищення, перетворення та аналіз даних. Після того, як набір даних або робочий процес даних готовий, фахівець з даних використовує різні методи, щоб виявити ідеї та приховані шаблони. Маніпулювання даними має бути надійним і таким же простим у використанні. Spark це правильний інструмент завдяки його швидкості та багатим API.

У цьому PySpark підручник, ви дізнаєтеся, як створити класифікатор за допомогою PySpark приклади

Як встановити PySpark з AWS

Команда Jupyter команда створює образ Docker для запуску Spark ефективно. Нижче наведено кроки, які ви можете виконати, щоб встановити PySpark екземпляр в AWS.

Перегляньте наш підручник AWS та TensorFlow

Крок 1: Створіть екземпляр

Перш за все, вам потрібно створити екземпляр. Перейдіть до свого облікового запису AWS і запустіть екземпляр. Ви можете збільшити пам’ять до 15 г і використовувати ту саму групу безпеки, що й у підручнику TensorFlow.

Крок 2: Відкрийте підключення

Відкрийте підключення та встановіть докер-контейнер. Для отримання додаткової інформації зверніться до підручника з TensorFlow з Docker. Зауважте, що ви повинні бути в правильному робочому каталозі.

Просто запустіть ці коди, щоб установити Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Крок 3: повторно відкрийте підключення та встановіть Spark

Після повторного відкриття з’єднання можна встановити образ, що містить PySpark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Крок 4: відкритий Jupyter

Перевірте контейнер і його назву

docker ps

Запустіть докер із журналами докерів, а потім назву докера. Наприклад, журнали докерів zealous_goldwasser

Перейдіть у свій браузер і запустіть Jupyter. Адреса http://localhost:8888/. Вставте пароль, наданий терміналом.

Примітка:: якщо ви хочете завантажити/завантажити файл на свою машину AWS, ви можете скористатися програмним забезпеченням Cyberduck, https://cyberduck.io/.

Як встановити PySpark on Windows/Mac з Conda

Нижче наведено детальний процес встановлення PySpark on Windows/Mac використовує Anaconda:

Для установки Spark на вашій локальній машині рекомендовано створити нове середовище conda. Це нове середовище буде встановлено Python 3.6, Spark і всі залежності.

Користувач Mac

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows користувач

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Ви можете редагувати файл .yml. Будьте обережні з відступом. Перед –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Збережіть це та створіть середовище. Це займає деякий час

conda env create -f hello-spark.yml

Щоб дізнатися більше про розташування, перегляньте підручник «Встановити TensorFlow».

Ви можете перевірити все середовище, встановлене на вашій машині

conda env list
Activate hello-spark

Користувач Mac

source activate hello-spark

Windows користувач

activate hello-spark

Примітка: Ви вже створили спеціальне середовище TensorFlow для запуску навчальних посібників у TensorFlow. Зручніше створити нове середовище, відмінне від hello-tf. Немає сенсу перевантажувати hello-tf Spark або будь-які інші бібліотеки машинного навчання.

Уявіть, що більша частина вашого проекту включає TensorFlow, але вам потрібно використовувати Spark для одного конкретного проекту. Ви можете встановити середовище TensorFlow для всього свого проекту та створити окреме середовище для нього Spark. Ви можете додати скільки завгодно бібліотек Spark середовище, яке ви хочете, не втручаючись у середовище TensorFlow. Коли ви закінчите з Sparkпроект, ви можете стерти його, не впливаючи на середовище TensorFlow.

Jupyter

відкритий Jupyter Блокнот і спробуйте, якщо PySpark працює. У новий зошит вставте наступне PySpark зразок коду:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Якщо відображається помилка, ймовірно, що Java не встановлено на вашій машині. У mac відкрийте термінал і напишіть java -version, якщо є версія java, переконайтеся, що це 1.8. в Windows, перейдіть до програми та перевірте, чи є Java папку. Якщо є Java папку, перевірте це Java 1.8 встановлено. На момент написання цієї статті PySpark не сумісний із Java9 і вище.

Якщо потрібно встановити Java, вам подумати за посиланням і завантажте jdk-8u181-windows-x64.exe

Jupyter

Користувачам Mac рекомендується використовувати `brew.`

brew tap caskroom/versions
brew cask install java8

Перегляньте цей покроковий підручник як встановити Java

Примітка:: Використовуйте видалити, щоб повністю видалити середовище.

 conda env remove -n hello-spark -y

Spark Контекст

SparkКонтекст — це внутрішній механізм, який дозволяє з’єднуватися з кластерами. Якщо ви хочете запустити операцію, вам потрібно a SparkКонтекст.

Створити SparkКонтекст

Перш за все, вам потрібно ініціювати a SparkКонтекст.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Тепер, коли SparkКонтекст готовий, ви можете створити колекцію даних під назвою RDD, Resilient Distributed Dataset. Обчислення в RDD автоматично розпаралелюються в кластері.

nums= sc.parallelize([1,2,3,4])

Ви можете отримати доступ до першого ряду за допомогою take

nums.take(1)
[1]

Ви можете застосувати перетворення до даних за допомогою лямбда-функції. У PySpark у прикладі нижче ви повертаєте квадрат чисел. Це трансформація карти

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLContext

Більш зручний спосіб - використовувати DataFrame. SparkКонтекст уже встановлено, ви можете використовувати його для створення dataFrame. Вам також потрібно оголосити SQLContext

SQLContext дозволяє підключати движок до різних джерел даних. Він використовується для запуску функцій Spark sql.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Тепер у цьому Spark підручник Python, давайте створимо список кортежу. Кожен кортеж міститиме імена людей та їхній вік. Потрібні чотири кроки:

Крок 1) Створіть список кортежів з інформацією

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Крок 2) Створіть RDD

rdd = sc.parallelize(list_p)

Крок 3) Перетворіть кортежі

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Крок 4) Створіть контекст DataFrame

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Якщо ви хочете отримати доступ до типу кожної функції, ви можете використовувати printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Приклад машинного навчання з PySpark

Тепер у вас є коротке уявлення про Spark і SQLContext, ви готові створити свою першу програму машинного навчання.

Нижче наведено кроки для створення програми машинного навчання за допомогою PySpark:

  • Крок 1) Основні операції з PySpark
  • Крок 2) Попередня обробка даних
  • Крок 3) Побудуйте конвеєр обробки даних
  • Крок 4) Побудувати класифікатор: логістика
  • Крок 5) Тренуйтеся та оцінюйте модель
  • Крок 6) Налаштуйте гіперпараметр

У цьому PySpark Підручник з машинного навчання, ми будемо використовувати набір даних для дорослих. Мета цього підручника — навчитися користуватися Pyspark. Щоб отримати додаткові відомості про набір даних, зверніться до цього підручника.

Зауважте, що набір даних незначний, і ви можете подумати, що обчислення займає багато часу. Spark призначений для обробки значної кількості даних. SparkПродуктивність підвищується порівняно з іншими бібліотеками машинного навчання, коли оброблений набір даних збільшується.

Крок 1) Основні операції з PySpark

Перш за все, вам потрібно ініціалізувати SQLContext, який ще не запущений.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

тоді ви можете прочитати файл cvs за допомогою sqlContext.read.csv. Ви використовуєте inferSchema зі значенням True, щоб сказати Spark щоб автоматично вгадати тип даних. За замовчуванням встановлено значення False.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

Давайте подивимося на тип даних

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Ви можете переглянути дані за допомогою show.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Якщо ви не встановили для inderShema значення True, ось що відбувається з типом. Є все в рядку.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Щоб перетворити безперервну змінну в правильний формат, ви можете використати переробку стовпців. Ви можете використовувати withColumn, щоб сказати Spark який стовпець виконувати перетворення.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Виберіть стовпці

Ви можете вибрати та показати рядки за допомогою вибору та назв функцій. Нижче вибрано вік і вік.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Порахувати по групах

Якщо ви хочете підрахувати кількість входжень за групою, ви можете ланцюжком:

  • groupBy()
  • рахувати()

разом. У PySpark У прикладі нижче ви підраховуєте кількість рядків за рівнем освіти.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Опишіть дані

Щоб отримати зведену статистику даних, ви можете використовувати describe(). Він обчислить:

  • вважати
  • значити
  • стандартне відхилення
  • хвилин
  • Макс
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Якщо вам потрібна підсумкова статистика лише для одного стовпця, додайте ім’я стовпця всередину describe()

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Перехресні обчислення

У деяких випадках може бути цікаво побачити описову статистику між двома попарними стовпцями. Наприклад, ви можете підрахувати кількість людей із доходом нижче або вище 50 тисяч за рівнем освіти. Ця операція називається перехресною таблицею.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Ви бачите, що в молодості жодна людина не має доходу вище 50 тис.

Колонка скидання

Існує два інтуїтивно зрозумілих API для видалення стовпців:

  • drop(): Відкинути стовпець
  • dropna(): Видалити NA

Нижче ви опускаєте стовпець Education_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Фільтрувати дані

Ви можете використовувати filter(), щоб застосувати описову статистику до підмножини даних. Наприклад, можна порахувати кількість людей старше 40 років

df.filter(df.age > 40).count()

13443

Descriptive статистика по групах

Нарешті, ви можете групувати дані за групами та обчислювати статистичні операції, наприклад середнє значення.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Крок 2) Попередня обробка даних

Обробка даних є критично важливим кроком у машинному навчанні. Після видалення сміттєвих даних ви отримаєте деякі важливі відомості.

Наприклад, ви знаєте, що вік не є лінійною функцією доходу. Коли люди молоді, їхні доходи зазвичай нижчі, ніж у середньому віці. Після виходу на пенсію домогосподарство використовує свої заощадження, що означає зменшення доходу. Щоб зафіксувати цей візерунок, ви можете додати квадрат до ознаки віку

Додайте квадрат віку

Щоб додати нову функцію, вам потрібно:

  1. Виберіть стовпець
  2. Застосуйте перетворення та додайте його до DataFrame
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Ви бачите, що age_square успішно додано до кадру даних. Ви можете змінити порядок змінних за допомогою select. Нижче ви наводите age_square відразу після age.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Виключити Голландію-Нідерланди

Коли група в межах об’єкта має лише одне спостереження, це не приносить інформації до моделі. Навпаки, це може призвести до помилки під час перехресної перевірки.

Перевіримо походження домогосподарства

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

Функція native_country містить лише одне домогосподарство з Нідерландів. Ви виключаєте це.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Крок 3) Створіть конвеєр обробки даних

Подібно до scikit-learn, Pyspark має конвеєрний API.

Конвеєр дуже зручний для підтримки структури даних. Ви надсилаєте дані в конвеєр. Усередині конвеєра виконуються різні операції, вихід використовується для живлення алгоритму.

Наприклад, одне універсальне перетворення в машинному навчанні складається з перетворення рядка в один гарячий кодер, тобто один стовпець групою. Один гарячий кодер зазвичай є матрицею, повною нулів.

Етапи перетворення даних дуже схожі на scikit-learn. Тобі потрібно:

  • Індексувати рядок до числового
  • Створіть єдиний гарячий кодер
  • Перетворіть дані

Два API виконують цю роботу: StringIndexer, OneHotEncoder

  1. Перш за все, ви вибираєте стовпець рядка для індексування. InputCol — це ім’я стовпця в наборі даних. outputCol — це нове ім'я, дане перетвореному стовпцю.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Припасуйте дані та трансформуйте їх
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Створіть колонки новин на основі групи. Наприклад, якщо у функції є 10 груп, нова матриця матиме 10 стовпців, по одному для кожної групи.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Побудуйте трубопровід

Ви побудуєте конвеєр для перетворення всіх точних функцій і додавання їх до остаточного набору даних. Конвеєр матиме чотири операції, але ви можете додати скільки завгодно операцій.

  1. Закодуйте категоріальні дані
  2. Індексуйте функцію мітки
  3. Додайте безперервну змінну
  4. Зберіть сходинки.

Кожен крок зберігається у списку з іменами етапів. Цей список вкаже VectorAssembler, яку операцію виконувати всередині конвеєра.

1. Закодуйте категоріальні дані

Цей крок точно такий самий, як наведений вище приклад, за винятком того, що ви циклічно переглядаєте всі категоріальні функції.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Індексуйте функцію мітки

Spark, як і багато інших бібліотек, не приймає рядкові значення для мітки. Ви перетворюєте функцію мітки за допомогою StringIndexer і додаєте її до етапів списку

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Додайте безперервну змінну

InputCols VectorAssembler — це список стовпців. Ви можете створити новий список, який містить усі нові стовпці. Наведений нижче код заповнює список закодованими категоріальними ознаками та безперервними функціями.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Зберіть сходинки.

Нарешті, ви проходите всі кроки у VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Тепер, коли всі кроки готові, ви надсилаєте дані в конвеєр.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Якщо ви перевірите новий набір даних, ви побачите, що він містить усі функції, перетворені та не перетворені. Вас цікавить лише нова етикетка та функції. Функції включають усі перетворені функції та безперервні змінні.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Крок 4) Побудувати класифікатор: логістика

Щоб зробити обчислення швидшим, ви перетворюєте модель на DataFrame.

Вам потрібно вибрати нову мітку та функції з моделі за допомогою карти.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Ви готові створити дані про поїзд як DataFrame. Ви використовуєте sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

Перевірте другий ряд

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Створіть набір тренувань/тестів

Ви розділяєте набір даних 80/20 за допомогою randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Давайте порахуємо, скільки людей із доходом нижче/вище 50 тис. у наборі для навчання та тестування

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Побудуйте логістичний регресор

Нарешті, але не менш важливо, ви можете створити класифікатор. Pyspark має API під назвою LogisticRegression для виконання логістичної регресії.

Ви ініціалізуєте lr, вказуючи стовпець міток і стовпці функцій. Ви встановлюєте максимум 10 ітерацій і додаєте параметр регулярізації зі значенням 0.3. Зауважте, що в наступному розділі ви використовуватимете перехресну перевірку за допомогою сітки параметрів для налаштування моделі

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Ви можете побачити коефіцієнти регресії

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Крок 5) Навчіть і оцініть модель

Щоб створити прогноз для вашого тестового набору,

Ви можете використовувати linearModel з transform() для test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Ви можете друкувати елементи у прогнозах

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Вас цікавить мітка, прогноз і ймовірність

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Оцініть модель

Вам потрібно подивитися на показник точності, щоб побачити, наскільки добре (чи погано) працює модель. Наразі не існує API для обчислення міри точності Spark. Значенням за замовчуванням є ROC, робоча характеристика приймача. Це різні показники, які враховують помилкові позитивні результати.

Перш ніж дивитися на ROC, давайте побудуємо міру точності. Ви більше знайомі з цим показником. Показник точності – це сума правильного прогнозу за загальною кількістю спостережень.

Ви створюєте DataFrame з міткою та `prediction.

cm = predictions.select("label", "prediction")

Ви можете перевірити номер класу в мітці та прогноз

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

Наприклад, у тестовому наборі є 1578 домогосподарств з доходом вище 50 тис. і 5021 нижче. Класифікатор, однак, передбачив 617 домогосподарств з доходом вище 50 тис.

Ви можете обчислити точність, обчисливши кількість, коли мітка правильно класифікована за загальною кількістю рядків.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Ви можете об’єднати все разом і написати функцію для обчислення точності.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROC метрики

Модуль BinaryClassificationEvaluator включає показники ROC. Приймач OperaХарактеристична крива є ще одним поширеним інструментом, який використовується з бінарною класифікацією. Вона дуже схожа на криву точності/відкликання, але замість графіка точності від відкликання крива ROC показує справжній позитивний коефіцієнт (тобто відкликання) проти хибного позитивного коефіцієнта. Рівень хибнопозитивних результатів – це співвідношення негативних випадків, які неправильно класифікуються як позитивні. Він дорівнює одиниці мінус справжній негативний коефіцієнт. Справжній негативний показник також називають специфічністю. Отже, крива ROC відображає чутливість (запам’ятовування) проти 1 – специфічність

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192площаПід РПЦ

print(evaluator.evaluate(predictions))

0.8940481662695192

Крок 6) Налаштуйте гіперпараметр

І останнє, але не менш важливе, ви можете налаштувати гіперпараметри. Схожий на scikit вчитися ви створюєте сітку параметрів і додаєте параметри, які хочете налаштувати.

Щоб скоротити час обчислення, ви налаштовуєте лише два значення параметра регуляризації.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Нарешті, ви оцінюєте модель за допомогою методу перехресної перевірки з 5 кратністю. Тренування займає близько 16 хвилин.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Час навчання моделі: 978.807 секунд

Найкращий гіперпараметр регуляризації становить 0.01 з точністю 85.316 відсотка.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Ви можете витягнути рекомендований параметр, об’єднавши cvModel.bestModel у ланцюжок з extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Резюме

Spark є основним інструментом для спеціаліста з даних. Це дозволяє фахівцю підключати програму до різних джерел даних, безперебійно виконувати аналіз даних або додавати прогнозну модель.

Почати з Spark, вам потрібно ініціювати a Spark Контекст із:

"SparkContext()'

і і SQL контекст для підключення до джерела даних:

'SQLContext()'

У посібнику ви дізнаєтесь, як навчити логістичну регресію:

  1. Перетворіть набір даних у Dataframe за допомогою:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Зауважте, що ім’я стовпця мітки є newlabel, і всі функції зібрані в функції. Змініть ці значення, якщо вони відрізняються у вашому наборі даних.

  1. Створіть набір тренувань/тестів
randomSplit([.8,.2],seed=1234)
  1. Тренуйте модель
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Зробіть прогноз
linearModel.transform()

Підсумуйте цей пост за допомогою: