@@ -23,8 +23,6 @@ import breeze.linalg.Transpose
2323import org .apache .spark .rdd .RDD
2424import org .apache .spark .mllib .linalg .{Matrices , Matrix , Vector , Vectors }
2525import org .apache .spark .mllib .stat .impl .MultivariateGaussian
26- import org .apache .spark .{Accumulator , AccumulatorParam , SparkContext }
27- import org .apache .spark .SparkContext .DoubleAccumulatorParam
2826
2927import scala .collection .mutable .IndexedSeqView
3028
@@ -86,19 +84,19 @@ class GaussianMixtureModelEM private (
8684 private def computeExpectation (
8785 weights : Array [Double ],
8886 dists : Array [MultivariateGaussian ])
89- (model : ExpectationSum , x : DenseDoubleVector ): ExpectationSum = {
90- val k = model ._2.length
87+ (sums : ExpectationSum , x : DenseDoubleVector ): ExpectationSum = {
88+ val k = sums ._2.length
9189 val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) }
9290 val pSum = p.sum
93- model ._1(0 ) += math.log(pSum)
91+ sums ._1(0 ) += math.log(pSum)
9492 val xxt = x * new Transpose (x)
9593 for (i <- 0 until k) {
9694 p(i) /= pSum
97- model ._2(i) += p(i)
98- model ._3(i) += x * p(i)
99- model ._4(i) += xxt * p(i)
95+ sums ._2(i) += p(i)
96+ sums ._3(i) += x * p(i)
97+ sums ._4(i) += xxt * p(i)
10098 }
101- model
99+ sums
102100 }
103101
104102 // number of samples per cluster to use when initializing Gaussians
@@ -243,41 +241,4 @@ class GaussianMixtureModelEM private (
243241 (0 until ss.length).foreach(i => cov(i,i) = ss(i) / x.length)
244242 cov
245243 }
246-
247- /**
248- * Given the input vectors, return the membership value of each vector
249- * to all mixture components.
250- */
251- def predictClusters (
252- points : RDD [Vector ],
253- mu : Array [Vector ],
254- sigma : Array [Matrix ],
255- weight : Array [Double ], k : Int ): RDD [Array [Double ]] = {
256- val sc = points.sparkContext
257- val dists = sc.broadcast{
258- (0 until k).map{ i =>
259- new MultivariateGaussian (mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix)
260- }.toArray
261- }
262- val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray)
263- points.map{ x =>
264- computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k)
265- }
266- }
267-
268- /**
269- * Compute the partial assignments for each vector
270- */
271- private def computeSoftAssignments (
272- pt : DenseDoubleVector ,
273- dists : Array [MultivariateGaussian ],
274- weights : Array [Double ],
275- k : Int ): Array [Double ] = {
276- val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) }
277- val pSum = p.sum
278- for (i <- 0 until k){
279- p(i) /= pSum
280- }
281- p
282- }
283244}
0 commit comments