Skip to content

Commit 227ad66

Browse files
committed
Moved prediction methods into model class.
1 parent 308c8ad commit 227ad66

File tree

2 files changed

+53
-48
lines changed

2 files changed

+53
-48
lines changed

mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
package org.apache.spark.mllib.clustering
1919

20+
import breeze.linalg.{DenseVector => BreezeVector}
21+
2022
import org.apache.spark.rdd.RDD
2123
import org.apache.spark.mllib.linalg.Matrix
2224
import org.apache.spark.mllib.linalg.Vector
25+
import org.apache.spark.mllib.stat.impl.MultivariateGaussian
2326

2427
/**
2528
* Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points
@@ -42,9 +45,50 @@ class GaussianMixtureModel(
4245

4346
/** Maps given points to their cluster indices. */
4447
def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = {
45-
val responsibilityMatrix = new GaussianMixtureModelEM()
46-
.predictClusters(points,mu,sigma,weight,k)
48+
val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k)
4749
val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max))
4850
(responsibilityMatrix, clusterLabels)
4951
}
52+
53+
/**
54+
* Given the input vectors, return the membership value of each vector
55+
* to all mixture components.
56+
*/
57+
def predictMembership(
58+
points: RDD[Vector],
59+
mu: Array[Vector],
60+
sigma: Array[Matrix],
61+
weight: Array[Double], k: Int): RDD[Array[Double]] = {
62+
val sc = points.sparkContext
63+
val dists = sc.broadcast{
64+
(0 until k).map{ i =>
65+
new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix)
66+
}.toArray
67+
}
68+
val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray)
69+
points.map{ x =>
70+
computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k)
71+
}
72+
}
73+
74+
// We use "eps" as the minimum likelihood density for any given point
75+
// in every cluster; this prevents any divide by zero conditions for
76+
// outlier points.
77+
private val eps = math.pow(2.0, -52)
78+
79+
/**
80+
* Compute the partial assignments for each vector
81+
*/
82+
private def computeSoftAssignments(
83+
pt: BreezeVector[Double],
84+
dists: Array[MultivariateGaussian],
85+
weights: Array[Double],
86+
k: Int): Array[Double] = {
87+
val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) }
88+
val pSum = p.sum
89+
for (i <- 0 until k){
90+
p(i) /= pSum
91+
}
92+
p
93+
}
5094
}

mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala

Lines changed: 7 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import breeze.linalg.Transpose
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
2525
import org.apache.spark.mllib.stat.impl.MultivariateGaussian
26-
import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext}
27-
import org.apache.spark.SparkContext.DoubleAccumulatorParam
2826

2927
import scala.collection.mutable.IndexedSeqView
3028

@@ -86,19 +84,19 @@ class GaussianMixtureModelEM private (
8684
private def computeExpectation(
8785
weights: Array[Double],
8886
dists: Array[MultivariateGaussian])
89-
(model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = {
90-
val k = model._2.length
87+
(sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = {
88+
val k = sums._2.length
9189
val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) }
9290
val pSum = p.sum
93-
model._1(0) += math.log(pSum)
91+
sums._1(0) += math.log(pSum)
9492
val xxt = x * new Transpose(x)
9593
for (i <- 0 until k) {
9694
p(i) /= pSum
97-
model._2(i) += p(i)
98-
model._3(i) += x * p(i)
99-
model._4(i) += xxt * p(i)
95+
sums._2(i) += p(i)
96+
sums._3(i) += x * p(i)
97+
sums._4(i) += xxt * p(i)
10098
}
101-
model
99+
sums
102100
}
103101

104102
// number of samples per cluster to use when initializing Gaussians
@@ -243,41 +241,4 @@ class GaussianMixtureModelEM private (
243241
(0 until ss.length).foreach(i => cov(i,i) = ss(i) / x.length)
244242
cov
245243
}
246-
247-
/**
248-
* Given the input vectors, return the membership value of each vector
249-
* to all mixture components.
250-
*/
251-
def predictClusters(
252-
points: RDD[Vector],
253-
mu: Array[Vector],
254-
sigma: Array[Matrix],
255-
weight: Array[Double], k: Int): RDD[Array[Double]] = {
256-
val sc = points.sparkContext
257-
val dists = sc.broadcast{
258-
(0 until k).map{ i =>
259-
new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix)
260-
}.toArray
261-
}
262-
val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray)
263-
points.map{ x =>
264-
computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k)
265-
}
266-
}
267-
268-
/**
269-
* Compute the partial assignments for each vector
270-
*/
271-
private def computeSoftAssignments(
272-
pt: DenseDoubleVector,
273-
dists: Array[MultivariateGaussian],
274-
weights: Array[Double],
275-
k: Int): Array[Double] = {
276-
val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) }
277-
val pSum = p.sum
278-
for (i <- 0 until k){
279-
p(i) /= pSum
280-
}
281-
p
282-
}
283244
}

0 commit comments

Comments
 (0)