PySpark 初心者向けチュートリアル: 例で学ぶ

Pyを学ぶ前にSpark、理解しましょう:

Apacheとは何ですか Spark?

Spark は、Hadoop MapReduce よりも簡単かつ高速であることが証明されているビッグ データ ソリューションです。 Spark は、2009 年にカリフォルニア大学バークレー校 RAD ラボによって開発されたオープンソース ソフトウェアです。2010 年に一般公開されて以来、 Spark は人気が高まり、前例のない規模で業界で使用されています。

の時代に ビッグデータ実務者は、データのストリーミングを処理するための高速で信頼性の高いツールをこれまで以上に必要としています。 MapReduce などの以前のツールは人気がありましたが、遅かったです。この問題を克服するには、 Spark は高速かつ汎用的なソリューションを提供します。主な違いは、 Spark そしてMapReduceとは Spark ハードディスク上で実行中にメモリ内で計算を実行します。高速アクセスとデータ処理が可能になり、時間を数時間から数分に短縮します。

パイとは何ですかSpark?

PySpark Apacheによって作成されたツールです Spark 使用のためのコミュニティ Python   SparkRDD(Resilient Distributed Dataset)を Pythonまた、Pyも提供していますSpark リンクするシェル Python API と Spark 開始するコア Spark 環境。 Spark はクラスタコンピューティングを実現するエンジンの名前であり、PySpark is Pythonのライブラリを使用する Spark.

どのように Spark 動作しますか?

Spark は計算エンジンに基づいており、アプリケーションのスケジュール、配布、監視を担当します。各タスクは、コンピューティング クラスターと呼ばれるさまざまなワーカー マシン間で実行されます。コンピューティング クラスターとは、タスクの分割を指します。1 つのマシンが 1 つのタスクを実行し、他のマシンは別のタスクを通じて最終出力に貢献します。最終的に、すべてのタスクが集約されて出力が生成されます。 Spark 管理者がさまざまな機能の 360 度の概要を提供します Spark ジョブ。

どのように Spark Work
どのように Spark Work

Spark で動作するように設計されています

  • Python
  • Java
  • スカラ
  • SQL

重要な特徴は、 Spark 機械学習用のMLlibを含む膨大な量の組み込みライブラリです. Spark また、Hadoop クラスターで動作するように設計されており、Hive データ、CSV、JSON、Casandra データなど、幅広い種類のファイルを読み取ることができます。

なぜ使用 Spark?

将来のデータ実践者として、Python の有名なライブラリである Pandas と scikit-learn に精通している必要があります。これら 2 つのライブラリは、中規模までのデータセットを探索するのに最適です。通常の機械学習プロジェクトは、次の方法論に基づいて構築されます。

  • データをディスクにロードします
  • データをマシンのメモリにインポートします
  • データの処理/分析
  • 機械学習モデルを構築する
  • 予測をディスクに保存し直す

データ サイエンティストが XNUMX 台のコンピューターでは大きすぎるデータを処理したい場合に問題が発生します。 データサイエンスの初期の頃、巨大なデータセットでのトレーニングは必ずしも必要ではなかったので、実践者はサンプリングを行っていました。 データ サイエンティストは、適切な統計サンプルを見つけ、追加の堅牢性チェックを実行して、優れたモデルを考案します。

ただし、これにはいくつかの問題があります。

  • データセットは現実世界を反映していますか?
  • データには具体的な例が含まれていますか?
  • モデルはサンプリングに適していますか?

たとえば、ユーザーの推奨事項を考えてみましょう。 レコメンダーは、ユーザーの好みを評価する際に、ユーザーを他のユーザーと比較することに依存します。 データ担当者がデータのサブセットのみを取得する場合、互いによく似たユーザーのコホートは存在しません。 レコメンダーは完全なデータセットで実行するか、まったく実行しない必要があります。

解決策は何ですか?

解決策は以前から明らかで、問題を複数のコンピューターに分割することです。並列コンピューティングにも複数の問題が伴います。開発者は並列コードの作成に苦労することが多く、最終的にはマルチプロセス処理自体に関する複雑な問題の多くを解決しなければならなくなります。

Pyspark は、データ サイエンティストに、並列データ処理の問題を解決するために使用できる API を提供します。Pyspark は、データの分散、コードの分散、マシン クラスター上のワーカーからの出力の収集など、マルチプロセッシングの複雑な処理を処理します。

Spark スタンドアロンで実行することもできますが、ほとんどの場合、Hadoopなどのクラスタコンピューティングフレームワーク上で実行されます。ただし、テストと開発では、データサイエンティストが効率的に実行できます。 Spark クラスターのない開発ボックスやラップトップで

• 主な利点の 1 つは、 Spark データストリーミング管理、シームレスなデータクエリ、機械学習予測、さまざまな分析へのリアルタイムアクセスを網羅するアーキテクチャを構築することです。

• Spark SQL 言語、つまり構造化データと密接に連携します。これにより、リアルタイムでデータをクエリできます。

• データサイエンティストの主な仕事は、予測モデルを分析して構築することです。 つまり、データ サイエンティストは、次を使用してデータをクエリする方法を知る必要があります。 SQL、統計レポートを作成し、機械学習を利用して予測を生成します。データ サイエンティストは、データのクリーニング、変換、分析に多くの時間を費やします。データセットまたはデータ ワークフローの準備が完了すると、データ サイエンティストはさまざまなテクニックを使用して洞察や隠れたパターンを発見します。データ操作は堅牢でありながら使いやすいものである必要があります。 Spark は、その速度と豊富な API のおかげで、最適なツールです。

このパイではSpark チュートリアルでは、Py を使用して分類器を構築する方法を学びますSpark 例。

Pyのインストール方法Spark AWSを使って

当学校区の Jupyter チームが実行する Docker イメージを構築する Spark 効率的に。以下は、Py をインストールするために実行できる手順です。Spark AWS のインスタンス。

チュートリアルを参照してください AWS および TensorFlow

ステップ 1: インスタンスを作成する

まず、インスタンスを作成する必要があります。 AWS アカウントに移動し、インスタンスを起動します。 ストレージを最大 15g まで増やし、TensorFlow チュートリアルと同じセキュリティ グループを使用できます。

ステップ 2: 接続を開く

接続を開き、Dockerコンテナをインストールします。詳細については、TensorFlowのチュートリアルを参照してください。 デッカー。 正しい作業ディレクトリに存在する必要があることに注意してください。

これらのコードを実行するだけで Docker をインストールできます。

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

ステップ 3: 接続を再度開いてインストールする Spark

接続を再度開いた後、Py を含むイメージをインストールできます。Spark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

ステップ4:開く Jupyter

コンテナとその名前を確認してください

docker ps

Docker ログの後に Docker の名前を付けて Docker を起動します。 たとえば、docker ログ zealous_goldwasser

ブラウザに移動して起動します Jupyter。 アドレスは http://localhost:8888/ です。 端末から指定されたパスワードを貼り付けます。

お願い: AWS マシンにファイルをアップロード/ダウンロードする場合は、Cyber​​duck ソフトウェアを使用できます。 https://cyberduck.io/.

Pyのインストール方法Spark on Windows/Mac と Conda

以下はPyのインストール方法の詳細な手順です。Spark on WindowsAnaconda を使用する /Mac:

インストールするには Spark ローカルマシンで新しいconda環境を作成することをお勧めします。この新しい環境では Python 3.6、 Spark そしてすべての依存関係。

Macユーザー

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows ユーザー

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

.yml ファイルを編集できます。 インデントには注意してください。 – の前に XNUMX つのスペースが必要です

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

保存して環境を作成します。 時間がかかります

conda env create -f hello-spark.yml

場所の詳細については、TensorFlowのインストールのチュートリアルをご覧ください。

マシンにインストールされているすべての環境を確認できます

conda env list
Activate hello-spark

Macユーザー

source activate hello-spark

Windows ユーザー

activate hello-spark

注意: TensorFlow でチュートリアルを実行するための特定の TensorFlow 環境はすでに作成されています。 hello-tf とは別の環境を新たに作成した方が便利です。 hello-tf をオーバーロードするのは意味がありません。 Spark または他の機械学習ライブラリ。

プロジェクトのほとんどに TensorFlow が含まれているが、次を使用する必要があると想像してください。 Spark ある特定のプロジェクトの場合。すべてのプロジェクトに TensorFlow 環境を設定し、プロジェクトごとに別の環境を作成できます。 Spark。ライブラリはいくつでも追加できます Spark TensorFlow 環境を妨げることなく、必要に応じて環境を構築できます。作業が完了したら、 Sparkのプロジェクトは、TensorFlow 環境に影響を与えることなく消去できます。

Jupyter

店は開いています Jupyter Notebook と Py を試してみるSpark 動作します。新しいノートブックに次のPyを貼り付けますSpark サンプルコード:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

エラーが表示された場合は、 Java がマシンにインストールされていません。Macでは、ターミナルを開いてjava -versionと入力し、javaのバージョンがある場合は1.8であることを確認してください。 Windowsアプリケーションにアクセスして、 Java フォルダ。 Java フォルダを確認してください Java 1.8がインストールされています。 この記事の執筆時点では、PySpark と互換性がありません Java9以上。

インストールする必要がある場合 Java、あなたは考える jdk-8u181-windows-x64.exeをダウンロードしてください

Jupyter

Mac ユーザーの場合は、「brew」を使用することをお勧めします。

brew tap caskroom/versions
brew cask install java8

このステップバイステップのチュートリアルを参照してください インストールする方法 Java

お願い: 環境を完全に消去するには、remove を使用します。

 conda env remove -n hello-spark -y

Spark コンテキスト

Sparkコンテキストは、クラスターとの接続を可能にする内部エンジンです。操作を実行するには、 Spark環境。

作る Sparkコンテキスト

まず最初に、 Spark環境。

import pyspark
from pyspark import SparkContext
sc =SparkContext()

今で Sparkコンテキストの準備ができたら、RDD (Resilient Distributed Dataset) と呼ばれるデータのコレクションを作成できます。RDD での計算は、クラスター全体で自動的に並列化されます。

nums= sc.parallelize([1,2,3,4])

take を使用して最初の行にアクセスできます

nums.take(1)
[1]

ラムダ関数を使用してデータに変換を適用できます。パイでSpark 以下の例では、nums の 2 乗を返します。マップ変換です

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLコンテキスト

より便利な方法は、DataFrame を使用することです。 Sparkコンテキストはすでに設定されているため、それを使用してデータフレームを作成できます。 SQLContext を宣言する必要もあります

SQLContext により、エンジンをさまざまなデータ ソースに接続できます。の機能を開始するために使用されます。 Spark SQL

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

今これで Spark チュートリアル Pythonタプルのリストを作成しましょう。各タプルには、人の名前と年齢が含まれます。必要な手順は次の 4 つです。

ステップ1) 情報を含むタプルのリストを作成します

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

ステップ2) RDDを構築する

rdd = sc.parallelize(list_p)

ステップ3) タプルを変換する

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

ステップ4) DataFrame コンテキストを作成する

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

各機能のタイプにアクセスしたい場合は、printSchema() を使用できます。

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Py を使用した機械学習の例Spark

簡単なアイデアを理解したところで、 Spark SQLContext を使用すると、最初の機械学習プログラムを構築する準備が整います。

以下はPyで機械学習プログラムを構築する手順です。Spark:

  • ステップ1) Pyを使った基本操作Spark
  • ステップ2) データ前処理
  • ステップ3) データ処理パイプラインを構築する
  • ステップ4) 分類器を構築します: ロジスティック
  • ステップ5) モデルのトレーニングと評価
  • ステップ6) ハイパーパラメータを調整する

このパイではSpark 機械学習チュートリアルでは、成人向けデータセットを使用します。このチュートリアルの目的は、Pyspark の使用方法を学ぶことです。データセットの詳細については、このチュートリアルを参照してください。

データセットは重要ではないため、計算には長い時間がかかると思われるかもしれないことに注意してください。 Spark かなりの量のデータを処理するように設計されています。 Sparkのパフォーマンスは、処理されるデータセットが大きくなると、他の機械学習ライブラリと比較して向上します。

ステップ1) Pyを使った基本操作Spark

まず、SQLContext がまだ初期化されていないため、初期化する必要があります。

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

その後、sqlContext.read.csv を使用して cvs ファイルを読み取ることができます。 True に設定された inferSchema を使用して通知します Spark データの種類を自動的に推測します。デフォルトでは、False に設定されています。

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

データ型を見てみましょう

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

データはshowで見ることができます。

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

inderShema を True に設定しなかった場合、型に何が起こるかは次のとおりです。 すべて文字列にあります。

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

連続変数を正しい形式に変換するには、列の再キャストを使用します。 withColumn を使用して伝えることができます Spark 変換を実行する列。

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

列を選択

select で行を選択し、機能の名前を表示できます。 以下では、年齢とfnlwgtが選択されています。

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

グループごとにカウントする

グループごとに出現数をカウントしたい場合は、次のように連鎖させます。

  • groupBy()
  • カウント()

一緒に。パイでSpark 以下の例では、教育レベルごとに行数をカウントします。

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

データの説明

データの概要統計を取得するには、describe() を使用できます。 それは以下を計算します:

  • カウント
  • 意味する
  • 標準偏差
  • マックス
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

XNUMX つの列のみの概要統計が必要な場合は、describe() 内に列の名前を追加します。

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

クロス集計の計算

場合によっては、50 つのペアの列間の記述統計を表示すると興味深いことがあります。たとえば、教育レベル別に収入が XNUMX ドル未満または XNUMX ドルを超える人の数をカウントできます。この操作はクロス集計と呼ばれます。

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

若いときに収入が50万を超える人はいないことがわかります。

ドロップカラム

列を削除するには XNUMX つの直感的な API があります。

  • drop(): 列を削除します
  • dropna(): NA を削除します

その下に列 education_num をドロップします。

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

データをフィルタリングする

filter() を使用すると、データのサブセットに記述統計を適用できます。 たとえば、40 歳以上の人の数を数えることができます。

df.filter(df.age > 40).count()

13443

Descriptグループ別の統計

最後に、データをグループごとにグループ化し、平均などの統計演算を計算できます。

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

ステップ 2) データの前処理

データ処理は機械学習における重要なステップです。 不要なデータを削除すると、いくつかの重要な洞察が得られます。

たとえば、年齢と収入が一次関数ではないことはご存知でしょう。 人々が若いとき、彼らの収入は通常、中年よりも低くなります。 退職後、家計は貯蓄を取り崩し、収入が減少します。 このパターンを捉えるには、年齢特徴に正方形を追加します。

年齢スクエアを追加

新しい機能を追加するには、次のことを行う必要があります。

  1. 列を選択します
  2. 変換を適用して DataFrame に追加します
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

age_square がデータ フレームに正常に追加されたことがわかります。 select を使用して変数の順序を変更できます。 以下では、age の直後に age_square を持ってきています。

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

オランダ - オランダを除く

特徴内のグループに観測値が XNUMX つしかない場合、モデルには情報がもたらされません。 逆に、相互検証中にエラーが発生する可能性があります。

世帯の由来を確認してみよう

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

フィーチャnative_countryには、オランダからの世帯がXNUMXつだけ含まれています。 あなたはそれを除外します。

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

ステップ 3) データ処理パイプラインを構築する

scikit-learn と同様に、Pyspark にはパイプライン API があります。

パイプラインは、データの構造を維持するのに非常に便利です。データをパイプラインにプッシュします。パイプライン内では、さまざまな操作が実行され、出力がアルゴリズムに供給されます。

たとえば、機械学習における XNUMX つの汎用変換は、文字列を XNUMX つのホット エンコーダー、つまりグループごとに XNUMX つの列に変換することで構成されます。 XNUMX つのホット エンコーダは通常、ゼロでいっぱいの行列です。

データを変換する手順は、scikit-learn と非常に似ています。 必要がある:

  • 文字列を数値にインデックス付けします
  • ワンホットエンコーダーを作成する
  • データを変換する

XNUMX つの API がその役割を果たします: StringIndexer、OneHotEncoder

  1. まず、インデックスを作成する文字列列を選択します。 inputCol はデータセット内の列の名前です。 OutputCol は、変換された列に与えられた新しい名前です。
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. データを当てはめて変換する
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. グループに基づいてニュースコラムを作成します。 たとえば、フィーチャ内に 10 個のグループがある場合、新しいマトリックスにはグループごとに 10 つずつ、合計 XNUMX 個の列が含まれます。
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

パイプラインを構築する

すべての正確な機能を変換し、最終的なデータセットに追加するパイプラインを構築します。パイプラインには 4 つの操作が含まれますが、必要な数だけ操作を追加できます。

  1. カテゴリデータをエンコードする
  2. ラベル フィーチャのインデックスを作成する
  3. 連続変数を追加する
  4. ステップを組み立てます。

各ステップは、stages という名前のリストに保存されます。このリストは、パイプライン内で実行する操作を VectorAssembler に指示します。

1. カテゴリデータをエンコードする

このステップは、すべてのカテゴリ特徴量をループすることを除いて、上の例とまったく同じです。

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. ラベル フィーチャのインデックスを作成する

Sparkは、他の多くのライブラリと同様に、ラベルの文字列値を受け入れません。 StringIndexer を使用してラベル機能を変換し、リスト ステージに追加します。

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. 連続変数の追加

VectorAssembler の inputCols は列のリストです。 すべての新しい列を含む新しいリストを作成できます。 以下のコードは、エンコードされたカテゴリ特徴と連続特徴をリストに追加します。

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. ステップを組み立てます。

最後に、VectorAssembler のすべてのステップを渡します。

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

すべての手順の準備が完了したので、データをパイプラインにプッシュします。

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

新しいデータセットを確認すると、変換されたフィーチャと変換されていないフィーチャがすべて含まれていることがわかります。 あなたは新しいラベルと機能だけに興味があります。 特徴には、変換されたすべての特徴と連続変数が含まれます。

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

ステップ 4) 分類器を構築する: ロジスティック

計算を高速化するには、モデルを DataFrame に変換します。

マップを使用してモデルから新しいラベルとフィーチャを選択する必要があります。

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

これで、トレイン データを DataFrame として作成する準備ができました。 sqlContextを使用します

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

XNUMX行目を確認してください

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

トレーニング/テスト セットを作成する

データセットをrandomSplitで80/20に分割します。

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

トレーニング セットとテスト セットの両方で、収入が 50 未満または XNUMX を超える人の数を数えてみましょう。

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

ロジスティック回帰分析を構築する

最後に、分類器を構築できます。Pyspark には、ロジスティック回帰を実行するための LogisticRegression という API があります。

lr を初期化するには、ラベル列と特徴列を指定します。 最大 10 回の反復を設定し、値 0.3 の正則化パラメーターを追加します。 次のセクションでは、パラメーター グリッドとの相互検証を使用してモデルを調整することに注意してください。

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#回帰式から係数を確認できます

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

ステップ 5) モデルのトレーニングと評価

テストセットの予測を生成するには、

test_dataのtransform()でlinearModelを使用できます

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

予測内の要素を出力できます

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

ラベル、予測、確率に興味があります

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

モデルを評価する

モデルのパフォーマンスがどの程度優れているか (または悪いか) を確認するには、精度メトリクスを確認する必要があります。現在、精度測定を計算するための API はありません。 Sparkデフォルト値は ROC (受信者動作特性曲線) です。これは、偽陽性率を考慮した別の指標です。

ROC を確認する前に、精度の尺度を構築しましょう。 この指標の方がよく知られています。 精度の尺度は、観測値の合計に対する正しい予測の合計です。

ラベルと「prediction」を使用して DataFrame を作成します。

cm = predictions.select("label", "prediction")

ラベル内のクラス数と予測を確認できます

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

たとえば、テスト セットには、収入が 1578 を超える世帯が 50 世帯あり、それ以下の世帯が 5021 世帯あります。 ただし、分類子は、収入が 617 を超える 50 世帯を予測しました。

ラベルが行の合計数に対して正しく分類されたときの数を計算することで、精度を計算できます。

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

すべてをラップして、精度を計算する関数を作成できます。

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROCメトリクス

モジュール BinaryClassificationEvaluator には ROC 測定が含まれています。受信機 Opera特性曲線は、二項分類で使用されるもう 1 つの一般的なツールです。これは適合率/再現率曲線に非常に似ていますが、ROC 曲線は適合率と再現率をプロットする代わりに、真陽性率 (つまり再現率) と偽陽性率を示します。偽陽性率は、誤って陽性として分類された陰性インスタンスの割合です。これは、XNUMX から真陰性率を引いたものに等しくなります。真陰性率は特異度とも呼ばれます。したがって、ROC 曲線は感度 (再現率) と XNUMX – 特異度をプロットします。

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192面積ROC下

print(evaluator.evaluate(predictions))

0.8940481662695192

ステップ 6) ハイパーパラメータを調整する

最後になりましたが、ハイパーパラメータを調整できます。 に似ている scikitlearn パラメーター グリッドを作成し、調整するパラメーターを追加します。

計算時間を短縮するには、XNUMX つの値のみを使用して正則化パラメーターを調整するだけです。

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

最後に、5 分割の交差検証法を使用してモデルを評価します。 トレーニングには約16分かかります。

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

モデルのトレーニング時間: 978.807 秒

最適な正則化ハイパーパラメータは 0.01 で、精度は 85.316 パーセントです。

accuracy_m(model = cvModel)
Model accuracy: 85.316%

cvModel.bestModel を extractParamMap() と連結することで、推奨パラメーターを抽出できます。

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

製品概要

Spark はデータサイエンティストにとって基本的なツールです。これにより、実務者はアプリをさまざまなデータ ソースに接続したり、データ分析をシームレスに実行したり、予測モデルを追加したりすることができます。

まずは Sparkを開始する必要があります。 Spark コンテキスト:

'Sparkコンテクスト()'

および SQL データ ソースに接続するためのコンテキスト:

'SQLContext()'

このチュートリアルでは、ロジスティック回帰をトレーニングする方法を学びます。

  1. 次のコマンドを使用して、データセットをデータフレームに変換します。
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

ラベルの列名は newlabel であり、すべての特徴が features に集められていることに注意してください。 データセットでこれらの値が異なる場合は、変更します。

  1. トレーニング/テスト セットを作成する
randomSplit([.8,.2],seed=1234)
  1. モデルを訓練する
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. 予測する
linearModel.transform()