Skip to content

Commit d5aae20

Browse files
committed
Adding Power Iteration Clustering and Suite test
1 parent a3c5fbe commit d5aae20

File tree

2 files changed

+512
-4
lines changed

2 files changed

+512
-4
lines changed
Lines changed: 244 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,248 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package org.apache.spark.mllib.clustering
218

19+
import org.apache.spark.graphx
20+
import org.apache.spark.SparkContext
21+
import org.apache.spark.rdd.RDD
322
/**
4-
* Created by fan on 1/22/15.
23+
* Power Iteration Clustering
24+
*
525
*/
6-
class PIClustering {
7-
8-
}
26+
object PIClustering {
27+
type DVector = Array[Double]
28+
type DEdge = Edge[Double]
29+
type LabeledPoint = (VertexId, DVector)
30+
type Points = Seq[LabeledPoint]
31+
type DGraph = Graph[Double, Double]
32+
type IndexedVector = (Long, DVector)
33+
val DefaultMinNormChange: Double = 1e-11
34+
val DefaultSigma = 1.0
35+
val DefaultIterations: Int = 20
36+
val DefaultMinAffinity = 1e-11
37+
val LA = SpectralClusteringUsingRdd.Linalg
38+
def cluster(sc: SparkContext,
39+
points: Points,
40+
nClusters: Int,
41+
nIterations: Int = DefaultIterations,
42+
sigma: Double = DefaultSigma,
43+
minAffinity: Double = DefaultMinAffinity) = {
44+
val nVertices = points.length
45+
val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
46+
val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
47+
val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
48+
val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
49+
getPrincipalEigen(sc, G)
50+
}
51+
/*
52+
vnorm[0]=2.019968019268192
53+
Updating vertex[0] from 0.2592592592592593 to 0.2597973189724011
54+
Updating vertex[1] from 0.19753086419753088 to 0.1695805301675885
55+
Updating vertex[3] from 0.2654320987654321 to 0.27258531045499795
56+
Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227
57+
*/
58+
def createInitialVector(sc: SparkContext,
59+
labels: Seq[VertexId],
60+
rowSums: Seq[Double]) = {
61+
val volume = rowSums.fold(0.0) {
62+
_ + _
63+
}
64+
val initialVt = labels.zip(rowSums.map(_ / volume))
65+
initialVt
66+
}
67+
def createGraphFromEdges(sc: SparkContext,
68+
edgesRdd: RDD[DEdge],
69+
nPoints: Int,
70+
optInitialVt: Option[Seq[(VertexId, Double)]] = None) = {
71+
assert(nPoints > 0, "Must provide number of points from the original dataset")
72+
val G = if (optInitialVt.isDefined) {
73+
val initialVt = optInitialVt.get
74+
val vertsRdd = sc.parallelize(initialVt)
75+
Graph(vertsRdd, edgesRdd)
76+
} else {
77+
Graph.fromEdges(edgesRdd, -1.0)
78+
}
79+
G
80+
}
81+
val printMatrices = true
82+
def getPrincipalEigen(sc: SparkContext,
83+
G: DGraph,
84+
nIterations: Int = DefaultIterations,
85+
optMinNormChange: Option[Double] = None
86+
): (DGraph, Double, DVector) = {
87+
var priorNorm = Double.MaxValue
88+
var norm = Double.MaxValue
89+
var priorNormVelocity = Double.MaxValue
90+
var normVelocity = Double.MaxValue
91+
var normAccel = Double.MaxValue
92+
val DummyVertexId = -99L
93+
var vnorm: Double = -1.0
94+
var outG: DGraph = null
95+
var prevG: DGraph = G
96+
val epsilon = optMinNormChange
97+
.getOrElse(1e-5 / G.vertices.count())
98+
for (iter <- 0 until nIterations
99+
if Math.abs(normAccel) > epsilon) {
100+
val tmpEigen = prevG.aggregateMessages[Double](ctx => {
101+
ctx.sendToSrc(ctx.attr * ctx.srcAttr);
102+
ctx.sendToDst(ctx.attr * ctx.dstAttr)
103+
},
104+
_ + _)
105+
println(s"tmpEigen[$iter]: ${tmpEigen.collect.mkString(",")}\n")
106+
val vnorm =
107+
prevG.vertices.map{ _._2}.fold(0.0) { case (sum, dval) =>
108+
sum + Math.abs(dval)
109+
}
110+
println(s"vnorm[$iter]=$vnorm")
111+
outG = prevG.outerJoinVertices(tmpEigen) { case (vid, wval, optTmpEigJ) =>
112+
val normedEig = optTmpEigJ.getOrElse {
113+
println("We got null estimated eigenvector element");
114+
-1.0
115+
} / vnorm
116+
println(s"Updating vertex[$vid] from $wval to $normedEig")
117+
normedEig
118+
}
119+
prevG = outG
120+
if (printMatrices) {
121+
val localVertices = outG.vertices.collect
122+
val graphSize = localVertices.size
123+
print(s"Vertices[$iter]: ${localVertices.mkString(",")}\n")
124+
}
125+
normVelocity = vnorm - priorNorm
126+
normAccel = normVelocity - priorNormVelocity
127+
println(s"normAccel[$iter]= $normAccel")
128+
priorNorm = vnorm
129+
priorNormVelocity = vnorm - priorNorm
130+
}
131+
(outG, vnorm, outG.vertices.collect.map {
132+
_._2
133+
})
134+
}
135+
// def printGraph(G: DGraph) = {
136+
// val collectedVerts = G.vertices.collect
137+
// val nVertices = collectedVerts.length
138+
// val msg = s"Graph Vertices:\n${printMatrix(collectedVerts, nVertices, nVertices)}"
139+
// }
140+
//
141+
def scalarDot(d1: DVector, d2: DVector) = {
142+
Math.sqrt(d1.zip(d2).foldLeft(0.0) { case (sum, (d1v, d2v)) =>
143+
sum + d1v * d2v
144+
})
145+
}
146+
def vectorDot(d1: DVector, d2: DVector) = {
147+
d1.zip(d2).map { case (d1v, d2v) =>
148+
d1v * d2v
149+
}
150+
}
151+
def normVect(d1: DVector, d2: DVector) = {
152+
val scaldot = scalarDot(d1, d2)
153+
vectorDot(d1, d2).map {
154+
_ / scaldot
155+
}
156+
}
157+
def readVerticesfromFile(verticesFile: String): Points = {
158+
import scala.io.Source
159+
val vertices = Source.fromFile(verticesFile).getLines.map { l =>
160+
val toks = l.split("\t")
161+
val arr = toks.slice(1, toks.length).map(_.toDouble)
162+
(toks(0).toLong, arr)
163+
}.toSeq
164+
println(s"Read in ${vertices.length} from $verticesFile")
165+
// println(vertices.map { case (x, arr) => s"($x,${arr.mkString(",")})"}
166+
// .mkString("[", ",\n", "]"))
167+
vertices
168+
}
169+
def gaussianDist(c1arr: DVector, c2arr: DVector, sigma: Double) = {
170+
val c1c2 = c1arr.zip(c2arr)
171+
val dist = Math.exp((0.5 / Math.pow(sigma, 2.0)) * c1c2.foldLeft(0.0) {
172+
case (dist: Double, (c1: Double, c2: Double)) =>
173+
dist + Math.pow(c1 - c2, 2)
174+
})
175+
dist
176+
}
177+
def createSparseEdgesRdd(sc: SparkContext, wRdd: RDD[IndexedVector],
178+
minAffinity: Double = DefaultMinAffinity) = {
179+
val labels = wRdd.map { case (vid, vect) => vid}.collect
180+
val edgesRdd = wRdd.flatMap { case (vid, vect) =>
181+
for ((dval, ix) <- vect.zipWithIndex
182+
if Math.abs(dval) >= minAffinity)
183+
yield Edge(vid, labels(ix), dval)
184+
}
185+
edgesRdd
186+
}
187+
def createNormalizedAffinityMatrix(sc: SparkContext, points: Points, sigma: Double) = {
188+
val nVertices = points.length
189+
val rowSums = for (bcx <- 0 until nVertices)
190+
yield sc.accumulator[Double](bcx, s"ColCounts$bcx")
191+
val affinityRddNotNorm = sc.parallelize({
192+
val ivect = new Array[IndexedVector](nVertices)
193+
var rsum = 0.0
194+
for (i <- 0 until points.size) {
195+
ivect(i) = new IndexedVector(points(i)._1, new DVector(nVertices))
196+
for (j <- 0 until points.size) {
197+
val dist = if (i != j) {
198+
gaussianDist(points(i)._2, points(j)._2, sigma)
199+
} else {
200+
0.0
201+
}
202+
ivect(i)._2(j) = dist
203+
rsum += dist
204+
}
205+
rowSums(i) += rsum
206+
}
207+
ivect.zipWithIndex.map { case (vect, ix) =>
208+
(ix, vect)
209+
}
210+
}, nVertices)
211+
val affinityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) =>
212+
(vid, vect.map {
213+
_ / rowSums(rowx).value
214+
})
215+
}
216+
(affinityRdd, rowSums.map {
217+
_.value
218+
})
219+
}
220+
def norm(vect: DVector): Double = {
221+
Math.sqrt(vect.foldLeft(0.0) { case (sum, dval) => sum + Math.pow(dval, 2)})
222+
}
223+
def printMatrix(darr: Array[DVector], numRows: Int, numCols: Int): String = {
224+
val flattenedArr = darr.zipWithIndex.foldLeft(new DVector(numRows * numCols)) {
225+
case (flatarr, (row, indx)) =>
226+
System.arraycopy(row, 0, flatarr, indx * numCols, numCols)
227+
flatarr
228+
}
229+
printMatrix(flattenedArr, numRows, numCols)
230+
}
231+
def printMatrix(darr: DVector, numRows: Int, numCols: Int): String = {
232+
val stride = (darr.length / numCols)
233+
val sb = new StringBuilder
234+
def leftJust(s: String, len: Int) = {
235+
" ".substring(0, len - Math.min(len, s.length)) + s
236+
}
237+
for (r <- 0 until numRows) {
238+
for (c <- 0 until numCols) {
239+
sb.append(leftJust(f"${darr(c * stride + r)}%.6f", 9) + " ")
240+
}
241+
sb.append("\n")
242+
}
243+
sb.toString
244+
}
245+
def printVect(dvect: DVector) = {
246+
dvect.mkString(",")
247+
}
248+
}

0 commit comments

Comments
 (0)