@@ -22,20 +22,21 @@ import org.apache.spark.SparkContext._
2222import org .apache .spark .{Logging , SparkContext }
2323
2424/**
25- * This object implements Apriori algorithm using Spark to find frequent item set in the given data set.
25+ * This object implements Apriori algorithm using Spark to find frequent item set in
26+ * the given data set.
2627 */
2728object Apriori extends Logging with Serializable {
2829
2930 /**
30- * Generate the first round FIS(frequent item set) from input data set. Returns single distinct item that
31- * appear greater than minCount times.
32- *
31+ * Generate the first round FIS(frequent item set) from input data set. Returns single
32+ * distinct item that appear greater than minCount times.
33+ *
3334 * @param dataSet input data set
3435 * @param minCount the minimum appearance time that computed from minimum degree of support
3536 * @return FIS
3637 */
3738 private def genFirstRoundFIS (dataSet : RDD [Set [String ]],
38- minCount : Double ): RDD [(Set [String ], Int )] = {
39+ minCount : Double ): RDD [(Set [String ], Int )] = {
3940 dataSet.flatMap(line => line)
4041 .map(v => (v, 1 ))
4142 .reduceByKey(_ + _)
@@ -50,16 +51,16 @@ object Apriori extends Logging with Serializable {
5051 * @return FIS
5152 */
5253 private def scanAndFilter (dataSet : RDD [Set [String ]],
53- candidate : RDD [Set [String ]],
54- minCount : Double ,
55- sc : SparkContext ): RDD [(Set [String ], Int )] = {
54+ candidate : RDD [Set [String ]],
55+ minCount : Double ,
56+ sc : SparkContext ): RDD [(Set [String ], Int )] = {
5657
5758 dataSet.cartesian(candidate).map(x =>
5859 if (x._2.subsetOf(x._1)) {
5960 (x._2, 1 )
6061 } else {
6162 (x._2, 0 )
62- }).reduceByKey(_+ _).filter(x => x._2 >= minCount)
63+ }).reduceByKey(_ + _).filter(x => x._2 >= minCount)
6364 }
6465
6566 /**
@@ -69,7 +70,7 @@ object Apriori extends Logging with Serializable {
6970 * @return candidate FIS
7071 */
7172 private def generateCombination (FISk : RDD [Set [String ]],
72- k : Int ): RDD [Set [String ]] = {
73+ k : Int ): RDD [Set [String ]] = {
7374 FISk .cartesian(FISk )
7475 .map(x => x._1 ++ x._2)
7576 .filter(x => x.size == k)
@@ -85,15 +86,16 @@ object Apriori extends Logging with Serializable {
8586 * @return frequent item sets in a array
8687 */
8788 def apriori (input : RDD [Array [String ]],
88- minSupport : Double ,
89- sc : SparkContext ): Array [(Set [String ], Int )] = {
89+ minSupport : Double ,
90+ sc : SparkContext ): Array [(Set [String ], Int )] = {
9091
9192 /*
9293 * This apriori implementation uses cartesian of two RDD, input data set and candidate
9394 * FIS (frequent item set).
9495 * The resulting FIS are computed in two steps:
9596 * The first step, find eligible distinct item in data set.
96- * The second step, loop in k round, in each round generate candidate FIS and filter out eligible FIS
97+ * The second step, loop in k round, in each round generate candidate FIS and filter
98+ * out eligible FIS
9799 */
98100
99101 // calculate minimum appearance count for minimum degree of support
@@ -144,9 +146,9 @@ object Apriori extends Logging with Serializable {
144146 FIS .collect().foreach(x => print(" (" + x._1 + " , " + x._2 + " ) " ))
145147 println()
146148 }
147-
149+
148150 private def printCk (Ck : RDD [Set [String ]], k : Int ) {
149- print(" C" + (k - 2 ) + " size " + Ck .count() + " value: " )
151+ print(" C" + (k - 2 ) + " size " + Ck .count() + " value: " )
150152 Ck .collect().foreach(print)
151153 println()
152154 }
0 commit comments