{"id":11731,"date":"2018-03-19T09:00:32","date_gmt":"2018-03-19T13:00:32","guid":{"rendered":"http:\/\/pzd.hmy.temporary.site\/?p=11731"},"modified":"2018-03-18T10:11:38","modified_gmt":"2018-03-18T14:11:38","slug":"multi-class-text-classification-with-pyspark","status":"publish","type":"post","link":"https:\/\/datascienceplus.com\/multi-class-text-classification-with-pyspark\/","title":{"rendered":"Multi-Class Text Classification with PySpark"},"content":{"rendered":"<p>Apache Spark is quickly gaining steam both in the headlines and real-world adoption, mainly because of its ability to process streaming data. With so much data being processed on a daily basis, it has become essential for us to be able to stream and analyze it in real time. In addition, Apache Spark is fast enough to perform exploratory queries without sampling. Many industry experts have provided all the reasons <a href=\"https:\/\/www.infoworld.com\/article\/3031690\/analytics\/why-you-should-use-spark-for-machine-learning.html\">why you should use Spark for Machine Learning?<\/a><\/p>\n<p>So, here we are now, using <a href=\"https:\/\/spark.apache.org\/docs\/1.1.0\/mllib-guide.html\">Spark Machine Learning Library<\/a> to solve a multi-class text classification problem, in particular, PySpark.<\/p>\n<p>If you would like to see an implementation in <a href=\"http:\/\/scikit-learn.org\/stable\/\">Scikit-Learn<\/a>, read the previous article.<\/p>\n<h2>The Data<\/h2>\n<p>Our task is to classify San Francisco Crime Description into 33 pre-defined categories. The data can be downloaded from <a href=\"https:\/\/www.kaggle.com\/c\/sf-crime\/data\">Kaggle<\/a>.<\/p>\n<p>Given a new crime description comes in, we want to assign it to one of 33 categories. The classifier makes the assumption that each new crime description is assigned to one and only one category. This is multi-class text classification problem.<\/p>\n<ul>\n* Input: Descript<br \/>\n* Example: \u201cSTOLEN AUTOMOBILE\u201d<br \/>\n* Output: Category<br \/>\n* Example: VEHICLE THEFT\n<\/ul>\n<p>To solve this problem, we will use a variety of feature extraction technique along with different supervised machine learning algorithms in Spark. Let\u2019s get started!<\/p>\n<h3>Data Ingestion and Extraction<\/h3>\n<p>Loading a CSV file is straightforward with <a href=\"https:\/\/github.com\/databricks\/spark-csv\">Spark csv packages<\/a>.<\/p>\n<pre>\r\nfrom pyspark.sql import SQLContext\r\nfrom pyspark import SparkContext\r\nsc =SparkContext()\r\nsqlContext = SQLContext(sc)\r\ndata = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('train.csv')\r\n<\/pre>\n<p>That\u2019s it! We have loaded the dataset. Let\u2019s start exploring.<\/p>\n<p>Remove the columns we do not need and have a look the first five rows:<\/p>\n<pre>\r\ndrop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']\r\ndata = data.select([column for column in data.columns if column not in drop_list])\r\ndata.show(5)\r\n<\/pre>\n<p>Gives this output:<br \/>\n<a href=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_1.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_1.png\" alt=\"\" width=\"325\" height=\"178\" class=\"alignnone size-full wp-image-11732\" \/><\/a><br \/>\nApply printSchema() on the data which will print the schema in a tree format:<\/p>\n<pre>\r\ndata.printSchema()\r\n<\/pre>\n<p>Gives this output:<br \/>\n<a href=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_2.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_2.png\" alt=\"\" width=\"345\" height=\"64\" class=\"alignnone size-full wp-image-11733\" \/><\/a><br \/>\nTop 20 crime categories:<\/p>\n<pre>\r\nfrom pyspark.sql.functions import col\r\ndata.groupBy(\"Category\") \\\r\n    .count() \\\r\n    .orderBy(col(\"count\").desc()) \\\r\n    .show()\r\n<\/pre>\n<p>Gives this output:<br \/>\n<a href=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_3.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_3.png\" alt=\"\" width=\"280\" height=\"450\" class=\"alignnone size-full wp-image-11734\" \/><\/a><\/p>\n<p>Top 20 crime descriptions:<\/p>\n<pre>\r\ndata.groupBy(\"Descript\") \\\r\n    .count() \\\r\n    .orderBy(col(\"count\").desc()) \\\r\n    .show()\r\n<\/pre>\n<p>Gives this output:<br \/>\n<a href=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_4.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_4.png\" alt=\"\" width=\"288\" height=\"451\" class=\"alignnone size-full wp-image-11735\" \/><\/a><\/p>\n<h2>Model Pipeline<\/h2>\n<p><a href=\"https:\/\/spark.apache.org\/docs\/2.2.0\/ml-pipeline.html\">Spark Machine Learning Pipelines API<\/a> is similar to Scikit-Learn. Our pipeline includes three steps:<\/p>\n<ul>\n1. regexTokenizer: Tokenization (with Regular Expression)<br \/>\n2. stopwordsRemover: Remove Stop Words<br \/>\n3. countVectors: Count vectors (\u201cdocument-term vectors\u201d)\n<\/ul>\n<pre>\r\nfrom pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer\r\nfrom pyspark.ml.classification import LogisticRegression\r\n# regular expression tokenizer\r\nregexTokenizer = RegexTokenizer(inputCol=\"Descript\", outputCol=\"words\", pattern=\"\\\\W\")\r\n# stop words\r\nadd_stopwords = [\"http\",\"https\",\"amp\",\"rt\",\"t\",\"c\",\"the\"] \r\nstopwordsRemover = StopWordsRemover(inputCol=\"words\", outputCol=\"filtered\").setStopWords(add_stopwords)\r\n# bag of words count\r\ncountVectors = CountVectorizer(inputCol=\"filtered\", outputCol=\"features\", vocabSize=10000, minDF=5)\r\n<\/pre>\n<h2>StringIndexer<\/h2>\n<p>StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.<\/p>\n<p>In our case, the label column (Category) will be encoded to label indices, from 0 to 32; the most frequent label (LARCENY\/THEFT) will be indexed as 0.<\/p>\n<pre>\r\nfrom pyspark.ml import Pipeline\r\nfrom pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler\r\nlabel_stringIdx = StringIndexer(inputCol = \"Category\", outputCol = \"label\")\r\npipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])\r\n# Fit the pipeline to training documents.\r\npipelineFit = pipeline.fit(data)\r\ndataset = pipelineFit.transform(data)\r\ndataset.show(5)\r\n<\/pre>\n<p>Gives this plot:<br \/>\n<a href=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_5.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_5-490x103.png\" alt=\"\" width=\"490\" height=\"103\" class=\"alignnone size-medium wp-image-11736\" srcset=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_5-490x103.png 490w, https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_5.png 697w\" sizes=\"auto, (max-width: 490px) 100vw, 490px\" \/><\/a><\/p>\n<h3>Partition Training &amp; Test sets<\/h3>\n<pre>\r\n# set seed for reproducibility\r\n(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)\r\nprint(\"Training Dataset Count: \" + str(trainingData.count()))\r\nprint(\"Test Dataset Count: \" + str(testData.count()))\r\n<em>Training Dataset Count: 5185\r\nTest Dataset Count: 2104\r\n<\/em>\r\n<\/pre>\n<h2>Model Training and Evaluation<\/h2>\n<h3>Logistic Regression using Count Vector Features<\/h3>\n<p>Our model will make predictions and score on the test set; we then look at the top 10 predictions from the highest probability.<\/p>\n<pre>\r\nlr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)\r\nlrModel = lr.fit(trainingData)\r\npredictions = lrModel.transform(testData)\r\npredictions.filter(predictions['prediction'] == 0) \\\r\n    .select(\"Descript\",\"Category\",\"probability\",\"label\",\"prediction\") \\\r\n    .orderBy(\"probability\", ascending=False) \\\r\n    .show(n = 10, truncate = 30)\r\n<\/pre>\n<p>Gives this output:<br \/>\n<a href=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_6.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_6-490x172.png\" alt=\"\" width=\"490\" height=\"172\" class=\"alignnone size-medium wp-image-11737\" srcset=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_6-490x172.png 490w, https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_6.png 668w\" sizes=\"auto, (max-width: 490px) 100vw, 490px\" \/><\/a><\/p>\n<pre>\r\nfrom pyspark.ml.evaluation import MulticlassClassificationEvaluator\r\nevaluator = MulticlassClassificationEvaluator(predictionCol=\"prediction\")\r\nevaluator.evaluate(predictions)\r\n<em>0.9610787444388802\r\n<\/em>\r\n<\/pre>\n<p>The accuracy is excellent!<\/p>\n<h3>Logistic Regression using TF-IDF Features<\/h3>\n<pre>\r\nfrom pyspark.ml.feature import HashingTF, IDF\r\nhashingTF = HashingTF(inputCol=\"filtered\", outputCol=\"rawFeatures\", numFeatures=10000)\r\nidf = IDF(inputCol=\"rawFeatures\", outputCol=\"features\", minDocFreq=5) #minDocFreq: remove sparse terms\r\npipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])\r\npipelineFit = pipeline.fit(data)\r\ndataset = pipelineFit.transform(data)\r\n(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)\r\nlr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)\r\nlrModel = lr.fit(trainingData)\r\npredictions = lrModel.transform(testData)\r\npredictions.filter(predictions['prediction'] == 0) \\\r\n    .select(\"Descript\",\"Category\",\"probability\",\"label\",\"prediction\") \\\r\n    .orderBy(\"probability\", ascending=False) \\\r\n    .show(n = 10, truncate = 30)\r\n<\/pre>\n<p>Gives this output:<br \/>\n<a href=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_7.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_7-490x172.png\" alt=\"\" width=\"490\" height=\"172\" class=\"alignnone size-medium wp-image-11738\" srcset=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_7-490x172.png 490w, https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_7.png 709w\" sizes=\"auto, (max-width: 490px) 100vw, 490px\" \/><\/a><\/p>\n<pre>\r\nevaluator = MulticlassClassificationEvaluator(predictionCol=\"prediction\")\r\nevaluator.evaluate(predictions)\r\n<em>0.9616202660247297\r\n<\/em>\r\n<\/pre>\n<p>The result is the same.<\/p>\n<h3>Cross-Validation<\/h3>\n<p>Let\u2019s now try cross-validation to tune our hyper parameters, and we will only tune the count vectors Logistic Regression.<\/p>\n<pre>\r\npipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])\r\npipelineFit = pipeline.fit(data)\r\ndataset = pipelineFit.transform(data)\r\n(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)\r\nlr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)\r\nfrom pyspark.ml.tuning import ParamGridBuilder, CrossValidator\r\n# Create ParamGrid for Cross Validation\r\nparamGrid = (ParamGridBuilder()\r\n             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter\r\n             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)\r\n#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations\r\n#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features\r\n             .build())\r\n# Create 5-fold CrossValidator\r\ncv = CrossValidator(estimator=lr, \\\r\n                    estimatorParamMaps=paramGrid, \\\r\n                    evaluator=evaluator, \\\r\n                    numFolds=5)\r\ncvModel = cv.fit(trainingData)\r\n\r\npredictions = cvModel.transform(testData)\r\n# Evaluate best model\r\nevaluator = MulticlassClassificationEvaluator(predictionCol=\"prediction\")\r\nevaluator.evaluate(predictions)\r\n<em>0.9851796929217101\r\n<\/em>\r\n<\/pre>\n<p>The performance improved.<\/p>\n<h3>Naive Bayes<\/h3>\n<pre>\r\nfrom pyspark.ml.classification import NaiveBayes\r\nnb = NaiveBayes(smoothing=1)\r\nmodel = nb.fit(trainingData)\r\npredictions = model.transform(testData)\r\npredictions.filter(predictions['prediction'] == 0) \\\r\n    .select(\"Descript\",\"Category\",\"probability\",\"label\",\"prediction\") \\\r\n    .orderBy(\"probability\", ascending=False) \\\r\n    .show(n = 10, truncate = 30)\r\n<\/pre>\n<p>Gives this output:<br \/>\n<a href=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_8.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_8-490x170.png\" alt=\"\" width=\"490\" height=\"170\" class=\"alignnone size-medium wp-image-11739\" srcset=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_8-490x170.png 490w, https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_8.png 672w\" sizes=\"auto, (max-width: 490px) 100vw, 490px\" \/><\/a><\/p>\n<pre>\r\nevaluator = MulticlassClassificationEvaluator(predictionCol=\"prediction\")\r\nevaluator.evaluate(predictions)\r\n<em>0.9625414629888848\r\n<\/em>\r\n<\/pre>\n<h3>Random Forest<\/h3>\n<pre>\r\nfrom pyspark.ml.classification import RandomForestClassifier\r\nrf = RandomForestClassifier(labelCol=\"label\", \\\r\n                            featuresCol=\"features\", \\\r\n                            numTrees = 100, \\\r\n                            maxDepth = 4, \\\r\n                            maxBins = 32)\r\n# Train model with Training Data\r\nrfModel = rf.fit(trainingData)\r\npredictions = rfModel.transform(testData)\r\npredictions.filter(predictions['prediction'] == 0) \\\r\n    .select(\"Descript\",\"Category\",\"probability\",\"label\",\"prediction\") \\\r\n    .orderBy(\"probability\", ascending=False) \\\r\n    .show(n = 10, truncate = 30)\r\n<\/pre>\n<p>Gives this output:<br \/>\n<a href=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_9.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_9-490x173.png\" alt=\"\" width=\"490\" height=\"173\" class=\"alignnone size-medium wp-image-11740\" srcset=\"https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_9-490x173.png 490w, https:\/\/datascienceplus.com\/wp-content\/uploads\/2018\/03\/text_spark_9.png 669w\" sizes=\"auto, (max-width: 490px) 100vw, 490px\" \/><\/a><\/p>\n<pre>\r\nevaluator = MulticlassClassificationEvaluator(predictionCol=\"prediction\")\r\nevaluator.evaluate(predictions)\r\n<em>0.6600326922344301\r\n<\/em>\r\n<\/pre>\n<p>Random forest is a very good, robust and versatile method, however it\u2019s no mystery that for high-dimensional sparse data it\u2019s not a best choice.<\/p>\n<p>It is obvious that Logistic Regression will be our model in this experiment, with cross-validation.<\/p>\n<p>This brings us to the end of the article. Source code that create this post can be found on <a href=\"https:\/\/github.com\/susanli2016\/Machine-Learning-with-Python\/blob\/master\/SF_Crime_Text_Classification_PySpark.ipynb\">Github<\/a>. <\/p>\n<p>I look forward to hearing any feedback or questions.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Apache Spark is quickly gaining steam both in the headlines and real-world adoption, mainly because of its ability to process streaming data. With so much data being processed on a daily basis, it has become essential for us to be able to stream and analyze it in real time. In addition, Apache Spark is fast [&hellip;]<\/p>\n","protected":false},"author":2359,"featured_media":11749,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[2],"tags":[226,231,283,131],"class_list":["post-11731","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-advanced-modeling","tag-nlp","tag-python","tag-spark","tag-text-mining"],"views":12403,"_links":{"self":[{"href":"https:\/\/datascienceplus.com\/wp-json\/wp\/v2\/posts\/11731","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/datascienceplus.com\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/datascienceplus.com\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/datascienceplus.com\/wp-json\/wp\/v2\/users\/2359"}],"replies":[{"embeddable":true,"href":"https:\/\/datascienceplus.com\/wp-json\/wp\/v2\/comments?post=11731"}],"version-history":[{"count":0,"href":"https:\/\/datascienceplus.com\/wp-json\/wp\/v2\/posts\/11731\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/datascienceplus.com\/wp-json\/wp\/v2\/media\/11749"}],"wp:attachment":[{"href":"https:\/\/datascienceplus.com\/wp-json\/wp\/v2\/media?parent=11731"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/datascienceplus.com\/wp-json\/wp\/v2\/categories?post=11731"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/datascienceplus.com\/wp-json\/wp\/v2\/tags?post=11731"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}