1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
117package org .apache .spark .mllib .clustering
218
19+ import org .apache .spark .graphx
20+ import org .apache .spark .SparkContext
21+ import org .apache .spark .rdd .RDD
322/**
4- * Created by fan on 1/22/15.
23+ * Power Iteration Clustering
24+ *
525 */
6- class PIClustering {
7-
8- }
26+ object PIClustering {
27+ type DVector = Array [Double ]
28+ type DEdge = Edge [Double ]
29+ type LabeledPoint = (VertexId , DVector )
30+ type Points = Seq [LabeledPoint ]
31+ type DGraph = Graph [Double , Double ]
32+ type IndexedVector = (Long , DVector )
33+ val DefaultMinNormChange : Double = 1e-11
34+ val DefaultSigma = 1.0
35+ val DefaultIterations : Int = 20
36+ val DefaultMinAffinity = 1e-11
37+ val LA = SpectralClusteringUsingRdd .Linalg
38+ def cluster (sc : SparkContext ,
39+ points : Points ,
40+ nClusters : Int ,
41+ nIterations : Int = DefaultIterations ,
42+ sigma : Double = DefaultSigma ,
43+ minAffinity : Double = DefaultMinAffinity ) = {
44+ val nVertices = points.length
45+ val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
46+ val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
47+ val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
48+ val G = createGraphFromEdges(sc, edgesRdd, points.size, Some (initialVt))
49+ getPrincipalEigen(sc, G )
50+ }
51+ /*
52+ vnorm[0]=2.019968019268192
53+ Updating vertex[0] from 0.2592592592592593 to 0.2597973189724011
54+ Updating vertex[1] from 0.19753086419753088 to 0.1695805301675885
55+ Updating vertex[3] from 0.2654320987654321 to 0.27258531045499795
56+ Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227
57+ */
58+ def createInitialVector (sc : SparkContext ,
59+ labels : Seq [VertexId ],
60+ rowSums : Seq [Double ]) = {
61+ val volume = rowSums.fold(0.0 ) {
62+ _ + _
63+ }
64+ val initialVt = labels.zip(rowSums.map(_ / volume))
65+ initialVt
66+ }
67+ def createGraphFromEdges (sc : SparkContext ,
68+ edgesRdd : RDD [DEdge ],
69+ nPoints : Int ,
70+ optInitialVt : Option [Seq [(VertexId , Double )]] = None ) = {
71+ assert(nPoints > 0 , " Must provide number of points from the original dataset" )
72+ val G = if (optInitialVt.isDefined) {
73+ val initialVt = optInitialVt.get
74+ val vertsRdd = sc.parallelize(initialVt)
75+ Graph (vertsRdd, edgesRdd)
76+ } else {
77+ Graph .fromEdges(edgesRdd, - 1.0 )
78+ }
79+ G
80+ }
81+ val printMatrices = true
82+ def getPrincipalEigen (sc : SparkContext ,
83+ G : DGraph ,
84+ nIterations : Int = DefaultIterations ,
85+ optMinNormChange : Option [Double ] = None
86+ ): (DGraph , Double , DVector ) = {
87+ var priorNorm = Double .MaxValue
88+ var norm = Double .MaxValue
89+ var priorNormVelocity = Double .MaxValue
90+ var normVelocity = Double .MaxValue
91+ var normAccel = Double .MaxValue
92+ val DummyVertexId = - 99L
93+ var vnorm : Double = - 1.0
94+ var outG : DGraph = null
95+ var prevG : DGraph = G
96+ val epsilon = optMinNormChange
97+ .getOrElse(1e-5 / G .vertices.count())
98+ for (iter <- 0 until nIterations
99+ if Math .abs(normAccel) > epsilon) {
100+ val tmpEigen = prevG.aggregateMessages[Double ](ctx => {
101+ ctx.sendToSrc(ctx.attr * ctx.srcAttr);
102+ ctx.sendToDst(ctx.attr * ctx.dstAttr)
103+ },
104+ _ + _)
105+ println(s " tmpEigen[ $iter]: ${tmpEigen.collect.mkString(" ," )}\n " )
106+ val vnorm =
107+ prevG.vertices.map{ _._2}.fold(0.0 ) { case (sum, dval) =>
108+ sum + Math .abs(dval)
109+ }
110+ println(s " vnorm[ $iter]= $vnorm" )
111+ outG = prevG.outerJoinVertices(tmpEigen) { case (vid, wval, optTmpEigJ) =>
112+ val normedEig = optTmpEigJ.getOrElse {
113+ println(" We got null estimated eigenvector element" );
114+ - 1.0
115+ } / vnorm
116+ println(s " Updating vertex[ $vid] from $wval to $normedEig" )
117+ normedEig
118+ }
119+ prevG = outG
120+ if (printMatrices) {
121+ val localVertices = outG.vertices.collect
122+ val graphSize = localVertices.size
123+ print(s " Vertices[ $iter]: ${localVertices.mkString(" ," )}\n " )
124+ }
125+ normVelocity = vnorm - priorNorm
126+ normAccel = normVelocity - priorNormVelocity
127+ println(s " normAccel[ $iter]= $normAccel" )
128+ priorNorm = vnorm
129+ priorNormVelocity = vnorm - priorNorm
130+ }
131+ (outG, vnorm, outG.vertices.collect.map {
132+ _._2
133+ })
134+ }
135+ // def printGraph(G: DGraph) = {
136+ // val collectedVerts = G.vertices.collect
137+ // val nVertices = collectedVerts.length
138+ // val msg = s"Graph Vertices:\n${printMatrix(collectedVerts, nVertices, nVertices)}"
139+ // }
140+ //
141+ def scalarDot (d1 : DVector , d2 : DVector ) = {
142+ Math .sqrt(d1.zip(d2).foldLeft(0.0 ) { case (sum, (d1v, d2v)) =>
143+ sum + d1v * d2v
144+ })
145+ }
146+ def vectorDot (d1 : DVector , d2 : DVector ) = {
147+ d1.zip(d2).map { case (d1v, d2v) =>
148+ d1v * d2v
149+ }
150+ }
151+ def normVect (d1 : DVector , d2 : DVector ) = {
152+ val scaldot = scalarDot(d1, d2)
153+ vectorDot(d1, d2).map {
154+ _ / scaldot
155+ }
156+ }
157+ def readVerticesfromFile (verticesFile : String ): Points = {
158+ import scala .io .Source
159+ val vertices = Source .fromFile(verticesFile).getLines.map { l =>
160+ val toks = l.split(" \t " )
161+ val arr = toks.slice(1 , toks.length).map(_.toDouble)
162+ (toks(0 ).toLong, arr)
163+ }.toSeq
164+ println(s " Read in ${vertices.length} from $verticesFile" )
165+ // println(vertices.map { case (x, arr) => s"($x,${arr.mkString(",")})"}
166+ // .mkString("[", ",\n", "]"))
167+ vertices
168+ }
169+ def gaussianDist (c1arr : DVector , c2arr : DVector , sigma : Double ) = {
170+ val c1c2 = c1arr.zip(c2arr)
171+ val dist = Math .exp((0.5 / Math .pow(sigma, 2.0 )) * c1c2.foldLeft(0.0 ) {
172+ case (dist : Double , (c1 : Double , c2 : Double )) =>
173+ dist + Math .pow(c1 - c2, 2 )
174+ })
175+ dist
176+ }
177+ def createSparseEdgesRdd (sc : SparkContext , wRdd : RDD [IndexedVector ],
178+ minAffinity : Double = DefaultMinAffinity ) = {
179+ val labels = wRdd.map { case (vid, vect) => vid}.collect
180+ val edgesRdd = wRdd.flatMap { case (vid, vect) =>
181+ for ((dval, ix) <- vect.zipWithIndex
182+ if Math .abs(dval) >= minAffinity)
183+ yield Edge (vid, labels(ix), dval)
184+ }
185+ edgesRdd
186+ }
187+ def createNormalizedAffinityMatrix (sc : SparkContext , points : Points , sigma : Double ) = {
188+ val nVertices = points.length
189+ val rowSums = for (bcx <- 0 until nVertices)
190+ yield sc.accumulator[Double ](bcx, s " ColCounts $bcx" )
191+ val affinityRddNotNorm = sc.parallelize({
192+ val ivect = new Array [IndexedVector ](nVertices)
193+ var rsum = 0.0
194+ for (i <- 0 until points.size) {
195+ ivect(i) = new IndexedVector (points(i)._1, new DVector (nVertices))
196+ for (j <- 0 until points.size) {
197+ val dist = if (i != j) {
198+ gaussianDist(points(i)._2, points(j)._2, sigma)
199+ } else {
200+ 0.0
201+ }
202+ ivect(i)._2(j) = dist
203+ rsum += dist
204+ }
205+ rowSums(i) += rsum
206+ }
207+ ivect.zipWithIndex.map { case (vect, ix) =>
208+ (ix, vect)
209+ }
210+ }, nVertices)
211+ val affinityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) =>
212+ (vid, vect.map {
213+ _ / rowSums(rowx).value
214+ })
215+ }
216+ (affinityRdd, rowSums.map {
217+ _.value
218+ })
219+ }
220+ def norm (vect : DVector ): Double = {
221+ Math .sqrt(vect.foldLeft(0.0 ) { case (sum, dval) => sum + Math .pow(dval, 2 )})
222+ }
223+ def printMatrix (darr : Array [DVector ], numRows : Int , numCols : Int ): String = {
224+ val flattenedArr = darr.zipWithIndex.foldLeft(new DVector (numRows * numCols)) {
225+ case (flatarr, (row, indx)) =>
226+ System .arraycopy(row, 0 , flatarr, indx * numCols, numCols)
227+ flatarr
228+ }
229+ printMatrix(flattenedArr, numRows, numCols)
230+ }
231+ def printMatrix (darr : DVector , numRows : Int , numCols : Int ): String = {
232+ val stride = (darr.length / numCols)
233+ val sb = new StringBuilder
234+ def leftJust (s : String , len : Int ) = {
235+ " " .substring(0 , len - Math .min(len, s.length)) + s
236+ }
237+ for (r <- 0 until numRows) {
238+ for (c <- 0 until numCols) {
239+ sb.append(leftJust(f " ${darr(c * stride + r)}%.6f " , 9 ) + " " )
240+ }
241+ sb.append(" \n " )
242+ }
243+ sb.toString
244+ }
245+ def printVect (dvect : DVector ) = {
246+ dvect.mkString(" ," )
247+ }
248+ }
0 commit comments