1717
1818package org .apache .spark .mllib .linalg .distributed
1919
20- import java .util
20+ import java .util . Arrays
2121
2222import breeze .linalg .{Vector => BV , DenseMatrix => BDM , DenseVector => BDV , SparseVector => BSV }
2323import breeze .linalg .{svd => brzSvd , axpy => brzAxpy }
@@ -202,32 +202,23 @@ class RowMatrix(
202202 }
203203
204204 /**
205- * Multiply the Gramian matrix `A^T A` by a DenseVector on the right.
205+ * Multiplies the Gramian matrix `A^T A` by a dense vector on the right without computing `A^T A` .
206206 *
207- * @param v a local DenseVector whose length must match the number of columns of this matrix.
208- * @return a local DenseVector representing the product.
207+ @param v a dense vector whose length must match the number of columns of this matrix
208+ * @return a dense vector representing the product
209209 */
210- private [mllib] def multiplyGramianMatrixBy (v : DenseVector ): DenseVector = {
210+ private [mllib] def multiplyGramianMatrixBy (v : BDV [ Double ] ): BDV [ Double ] = {
211211 val n = numCols().toInt
212- val vbr = rows.context.broadcast(v.toBreeze)
213-
214- val bv = rows.aggregate(BDV .zeros[Double ](n))(
212+ val vbr = rows.context.broadcast(v)
213+ rows.aggregate(BDV .zeros[Double ](n))(
215214 seqOp = (U , r) => {
216215 val rBrz = r.toBreeze
217216 val a = rBrz.dot(vbr.value)
218- rBrz match {
219- case _ : BDV [_] => brzAxpy(a, rBrz.asInstanceOf [BDV [Double ]], U )
220- case _ : BSV [_] => brzAxpy(a, rBrz.asInstanceOf [BSV [Double ]], U )
221- case _ =>
222- throw new UnsupportedOperationException (
223- s " Do not support vector operation from type ${rBrz.getClass.getName}. " )
224- }
217+ brzAxpy(a, rBrz, U .asInstanceOf [BV [Double ]])
225218 U
226219 },
227220 combOp = (U1 , U2 ) => U1 += U2
228221 )
229-
230- Vectors .fromBreeze(bv).asInstanceOf [DenseVector ]
231222 }
232223
233224 /**
@@ -250,49 +241,51 @@ class RowMatrix(
250241 }
251242
252243 /**
253- * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n), this
244+ * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n). This
254245 * will compute matrices U, S, V such that A ~= U * S * V', where S contains the leading k
255246 * singular values, U and V contain the corresponding singular vectors.
256247 *
257- * This approach assumes n is smaller than m, and invokes a dense matrix implementation when n is
258- * small (n < 100) or the number of requested singular values is the same as n (k == n). For
259- * problems with large n (n >= 100) and k < n, this approach invokes a sparse matrix
260- * implementation that provides a function to ARPACK to multiply a vector with A'A. It iteratively
261- * calls ARPACK-dsaupd on the master node, from which we recover S and V. Then we compute U via
262- * easy matrix multiplication as U = A * (V * S^{-1}).
263- *
264- * The dense implementation requires `n^2` doubles to fit in memory and `O(n^3)` time on the
265- * master node.
248+ * At most k largest non-zero singular values and associated vectors are returned. If there are k
249+ * such values, then the dimensions of the return will be:
250+ * - U is a RowMatrix of size m x k that satisfies U' * U = eye(k),
251+ * - s is a Vector of size k, holding the singular values in descending order,
252+ * - V is a Matrix of size n x k that satisfies V' * V = eye(k).
266253 *
267- * The sparse implementation requires `n * (6 * k + 4)` doubles to fit in memory on the master
268- * node and approximately `O(k * nnz(A))` time distributed on all worker nodes. There is no
269- * restriction on m (number of rows).
254+ * We assume n is smaller than m. The singular values and the right singular vectors are derived
255+ * from the eigenvalues and the eigenvectors of the Gramian matrix A' * A. U, the matrix
256+ * storing the right singular vectors, is computed via matrix multiplication as
257+ * U = A * (V * S^-1^), if requested by user. The actual method to use is determined
258+ * automatically based on the cost:
259+ * - If n is small (n < 100) or k is large compared with n (k > n / 2), we compute the Gramian
260+ * matrix first and then compute its top eigenvalues and eigenvectors locally on the driver.
261+ * This requires a single pass with O(n^2^) storage on each executor and on the driver, and
262+ * O(n^2^ k) time on the driver.
263+ * - Otherwise, we compute (A' * A) * v in a distributive way and send it to ARPACK's DSAUPD to
264+ * compute (A' * A)'s top eigenvalues and eigenvectors on the driver node. This requires O(k)
265+ * passes, O(n) storage on each executor, and O(n k) storage on the driver.
270266 *
271267 * Several internal parameters are set to default values. The reciprocal condition number rCond
272268 * is set to 1e-9. All singular values smaller than rCond * sigma(0) are treated as zeros, where
273269 * sigma(0) is the largest singular value. The maximum number of Arnoldi update iterations for
274- * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK
270+ * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK's
275271 * eigen-decomposition is set to 1e-10.
276272 *
277- * At most k largest non-zero singular values and associated vectors are returned.
278- * If there are k such values, then the dimensions of the return will be:
273+ * @note The conditions that decide which method to use internally and the default parameters are
274+ * subject to change.
279275 *
280- * U is a RowMatrix of size m x k that satisfies U'U = eye(k),
281- * s is a Vector of size k, holding the singular values in descending order,
282- * and V is a Matrix of size n x k that satisfies V'V = eye(k).
283- *
284- * @param k number of leading singular values to keep (0 < k <= n). It might return less than
285- * k if there are numerically zero singular values or there are not enough Ritz values
276+ * @param k number of leading singular values to keep (0 < k <= n). It might return less than k if
277+ * there are numerically zero singular values or there are not enough Ritz values
286278 * converged before the maximum number of Arnoldi update iterations is reached (in case
287279 * that matrix A is ill-conditioned).
288- * @param computeU whether to compute U.
280+ * @param computeU whether to compute U
289281 * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
290282 * are treated as zero, where sigma(0) is the largest singular value.
291- * @return SingularValueDecomposition(U, s, V), U = null if computeU = false
283+ * @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
292284 */
293- def computeSVD (k : Int ,
294- computeU : Boolean = false ,
295- rCond : Double = 1e-9 ): SingularValueDecomposition [RowMatrix , Matrix ] = {
285+ def computeSVD (
286+ k : Int ,
287+ computeU : Boolean = false ,
288+ rCond : Double = 1e-9 ): SingularValueDecomposition [RowMatrix , Matrix ] = {
296289 // maximum number of Arnoldi update iterations for invoking ARPACK
297290 val maxIter = math.max(300 , k * 3 )
298291 // numerical tolerance for invoking ARPACK
@@ -301,87 +294,78 @@ class RowMatrix(
301294 }
302295
303296 /**
304- * Actual SVD computation, visible for testing.
297+ * The actual SVD implementation, visible for testing.
298+ *
299+ * @param k number of leading singular values to keep (0 < k <= n)
300+ * @param computeU whether to compute U
301+ * @param rCond the reciprocal condition number
302+ * @param maxIter max number of iterations (if ARPACK is used)
303+ * @param tol termination tolerance (if ARPACK is used)
304+ * @param mode computation mode (auto: determine automatically which mode to use,
305+ * local-svd: compute gram matrix and computes its full SVD locally,
306+ * local-eigs: compute gram matrix and computes its top eigenvalues locally,
307+ * dist-eigs: compute the top eigenvalues of the gram matrix distributively)
308+ * @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
305309 */
306- private [mllib] def computeSVD (k : Int ,
307- computeU : Boolean ,
308- rCond : Double ,
309- maxIter : Int ,
310- tol : Double ,
311- mode : String ): SingularValueDecomposition [RowMatrix , Matrix ] = {
310+ private [mllib] def computeSVD (
311+ k : Int ,
312+ computeU : Boolean ,
313+ rCond : Double ,
314+ maxIter : Int ,
315+ tol : Double ,
316+ mode : String ): SingularValueDecomposition [RowMatrix , Matrix ] = {
312317 val n = numCols().toInt
318+ require(k > 0 && k <= n, s " Request up to n singular values but got k= $k and n= $n. " )
313319
314320 object SVDMode extends Enumeration {
315- val DenseARPACK, DenseLAPACK, SparseARPACK = Value
321+ val LocalARPACK, LocalLAPACK, DistARPACK = Value
316322 }
317323
318- val derivedMode = mode match {
319- case " auto" => if (n < 100 || k == n) {
320- // invoke dense implementation when n is small or k == n (since ARPACK requires k < n)
321- require(k > 0 && k <= n, s " Request up to n singular values k= $k n= $n. " )
322- " dense"
323- } else {
324- // invoke sparse implementation with ARPACK when n is large
325- require(k > 0 && k < n, s " Request up to n - 1 singular values for ARPACK k= $k n= $n. " )
326- " sparse"
327- }
328- case " dense" => " dense"
329- case " sparse" => " sparse"
324+ val computeMode = mode match {
325+ case " auto" =>
326+ // TODO: The conditions below are not fully tested.
327+ if (n < 100 || k > n / 2 ) {
328+ // If n is small or k is large compared with n, we better compute the Gramian matrix first
329+ // and then compute its eigenvalues locally, instead of making multiple passes.
330+ if (k < n / 3 ) {
331+ SVDMode .LocalARPACK
332+ } else {
333+ SVDMode .LocalLAPACK
334+ }
335+ } else {
336+ // If k is small compared with n, we use ARPACK with distributed multiplication.
337+ SVDMode .DistARPACK
338+ }
339+ case " local-svd" => SVDMode .LocalLAPACK
340+ case " local-eigs" => SVDMode .LocalARPACK
341+ case " dist-eigs" => SVDMode .DistARPACK
330342 case _ => throw new IllegalArgumentException (s " Do not support mode $mode. " )
331343 }
332344
333- val computeMode = derivedMode match {
334- case " dense" => if (k < n / 2 ) {
335- // when k is small, call ARPACK
336- require(k > 0 && k < n, s " Request up to n - 1 singular values for ARPACK k= $k n= $n. " )
337- SVDMode .DenseARPACK
338- } else {
339- // when k is large, call LAPACK
340- SVDMode .DenseLAPACK
341- }
342- case " sparse" => SVDMode .SparseARPACK
343- }
344-
345+ // Compute the eigen-decomposition of A' * A.
345346 val (sigmaSquares : BDV [Double ], u : BDM [Double ]) = computeMode match {
346- case SVDMode .DenseARPACK => {
347+ case SVDMode .LocalARPACK =>
348+ require(k < n, s " k must be smaller than n in local-eigs mode but got k= $k and n= $n. " )
347349 val G = computeGramianMatrix().toBreeze.asInstanceOf [BDM [Double ]]
348- def multiplyDenseGramianMatrixBy (v : DenseVector ): DenseVector = {
349- Vectors .fromBreeze(G * v.toBreeze).asInstanceOf [DenseVector ]
350- }
351- EigenValueDecomposition .symmetricEigs(multiplyDenseGramianMatrixBy, n, k, tol, maxIter)
352- }
353- case SVDMode .DenseLAPACK => {
350+ EigenValueDecomposition .symmetricEigs(v => G * v, n, k, tol, maxIter)
351+ case SVDMode .LocalLAPACK =>
354352 val G = computeGramianMatrix().toBreeze.asInstanceOf [BDM [Double ]]
355- val (uFull : BDM [Double ], sigmaSquaresFull : BDV [Double ], vFull : BDM [ Double ] ) = brzSvd(G )
353+ val (uFull : BDM [Double ], sigmaSquaresFull : BDV [Double ], _ ) = brzSvd(G )
356354 (sigmaSquaresFull, uFull)
357- }
358- case SVDMode . SparseARPACK => {
355+ case SVDMode . DistARPACK =>
356+ require(k < n, s " k must be smaller than n in dist-eigs mode but got k= $k and n= $n . " )
359357 EigenValueDecomposition .symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter)
360- }
361358 }
362359
363- computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquares, u)
364- }
365-
366- /**
367- * Determine effective rank of SVD result and compute left singular vectors if required.
368- */
369- private def computeSVDEffectiveRank (
370- k : Int ,
371- n : Int ,
372- computeU : Boolean ,
373- rCond : Double ,
374- sigmaSquares : BDV [Double ],
375- u : BDM [Double ]): SingularValueDecomposition [RowMatrix , Matrix ] = {
376360 val sigmas : BDV [Double ] = brzSqrt(sigmaSquares)
377361
378- // Determine effective rank.
362+ // Determine the effective rank.
379363 val sigma0 = sigmas(0 )
380364 val threshold = rCond * sigma0
381365 var i = 0
382- // sigmas might have a length smaller than k, if some Ritz values do not satisfy the
383- // convergence criterion specified by tol after maxIterations .
384- // Thus use i < min(k, sigmas.length) instead of i < k
366+ // sigmas might have a length smaller than k, if some Ritz values do not satisfy the convergence
367+ // criterion specified by tol after max number of iterations .
368+ // Thus use i < min(k, sigmas.length) instead of i < k.
385369 if (sigmas.length < k) {
386370 logWarning(s " Requested $k singular values but only found ${sigmas.length} converged. " )
387371 }
@@ -394,12 +378,12 @@ class RowMatrix(
394378 logWarning(s " Requested $k singular values but only found $sk nonzeros. " )
395379 }
396380
397- val s = Vectors .dense(util. Arrays .copyOfRange(sigmas.data, 0 , sk))
398- val V = Matrices .dense(n, sk, util. Arrays .copyOfRange(u.data, 0 , n * sk))
381+ val s = Vectors .dense(Arrays .copyOfRange(sigmas.data, 0 , sk))
382+ val V = Matrices .dense(n, sk, Arrays .copyOfRange(u.data, 0 , n * sk))
399383
400384 if (computeU) {
401385 // N = Vk * Sk^{-1}
402- val N = new BDM [Double ](n, sk, util. Arrays .copyOfRange(u.data, 0 , n * sk))
386+ val N = new BDM [Double ](n, sk, Arrays .copyOfRange(u.data, 0 , n * sk))
403387 var i = 0
404388 var j = 0
405389 while (j < sk) {
@@ -484,7 +468,7 @@ class RowMatrix(
484468 if (k == n) {
485469 Matrices .dense(n, k, u.data)
486470 } else {
487- Matrices .dense(n, k, util. Arrays .copyOfRange(u.data, 0 , n * k))
471+ Matrices .dense(n, k, Arrays .copyOfRange(u.data, 0 , n * k))
488472 }
489473 }
490474
0 commit comments