Skip to content

Commit f5dbf4d

Browse files
committed
Teach ALS how to use the NNLS solver.
1 parent 6cb563c commit f5dbf4d

File tree

1 file changed

+193
-2
lines changed
  • mllib/src/main/scala/org/apache/spark/mllib/recommendation

1 file changed

+193
-2
lines changed

mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala

Lines changed: 193 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import org.apache.spark.storage.StorageLevel
3232
import org.apache.spark.rdd.RDD
3333
import org.apache.spark.serializer.KryoRegistrator
3434
import org.apache.spark.SparkContext._
35+
import org.apache.spark.util.Utils
36+
import org.apache.spark.mllib.optimization.NNLSbyPCG
3537

3638
/**
3739
* Out-link information for a user or product block. This includes the original user/product IDs
@@ -156,6 +158,18 @@ class ALS private (
156158
this
157159
}
158160

161+
/** If true, do alternating nonnegative least squares. */
162+
private var nonnegative = false
163+
164+
/**
165+
* Set whether the least-squares problems solved at each iteration should have
166+
* nonnegativity constraints.
167+
*/
168+
def setNonnegative(b: Boolean): ALS = {
169+
this.nonnegative = b
170+
this
171+
}
172+
159173
/**
160174
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
161175
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
@@ -498,10 +512,128 @@ class ALS private (
498512
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
499513
// Solve the resulting matrix, which is symmetric and positive-definite
500514
if (implicitPrefs) {
501-
Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data
515+
solveLeastSquares(fullXtX.addi(YtY.get.value), userXy(index))
516+
} else {
517+
solveLeastSquares(fullXtX, userXy(index))
518+
}
519+
}
520+
}
521+
522+
/**
523+
* Solve a least squares problem, possibly with nonnegativity constraints, by a modified
524+
* projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b.
525+
*/
526+
def solveLSbyPCG(ata: DoubleMatrix, atb: DoubleMatrix, nonnegative: Boolean): Array[Double] = {
527+
val n = atb.rows
528+
val scratch = new DoubleMatrix(n, 1)
529+
530+
// find the optimal unconstrained step
531+
def steplen(dir: DoubleMatrix, resid: DoubleMatrix): Double = {
532+
val top = SimpleBlas.dot(dir, resid)
533+
SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch)
534+
top / SimpleBlas.dot(scratch, dir)
535+
}
536+
537+
// stopping condition
538+
def stop(step: Double, ndir: Double, nx: Double): Boolean = {
539+
((step != step)
540+
|| (step < 1e-6)
541+
|| (ndir < 1e-12 * nx))
542+
}
543+
544+
val grad = new DoubleMatrix(n, 1)
545+
val x = new DoubleMatrix(n, 1)
546+
val dir = new DoubleMatrix(n, 1)
547+
val lastdir = new DoubleMatrix(n, 1)
548+
val resid = new DoubleMatrix(n, 1)
549+
var lastnorm = 0.0
550+
var iterno = 0
551+
var lastwall = 0
552+
var i = 0
553+
while (iterno < 40000) {
554+
// find the residual
555+
SimpleBlas.gemv(1.0, ata, x, 0.0, resid)
556+
SimpleBlas.axpy(-1.0, atb, resid)
557+
SimpleBlas.copy(resid, grad)
558+
559+
// project the gradient
560+
if (nonnegative) {
561+
i = 0
562+
while (i < n) {
563+
if (grad.data(i) > 0.0 && x.data(i) == 0.0)
564+
grad.data(i) = 0.0
565+
i = i + 1
566+
}
567+
}
568+
val ngrad = SimpleBlas.dot(grad, grad)
569+
570+
SimpleBlas.copy(grad, dir)
571+
572+
// use a CG direction under certain conditions
573+
var step = steplen(grad, resid)
574+
var ndir = 0.0
575+
val nx = SimpleBlas.dot(x, x)
576+
if (iterno > lastwall + 1) {
577+
val alpha = ngrad / lastnorm
578+
SimpleBlas.axpy(alpha, lastdir, dir)
579+
val dstep = steplen(dir, resid)
580+
ndir = SimpleBlas.dot(dir, dir)
581+
if (stop(dstep, ndir, nx)) {
582+
// reject the CG step if it could lead to premature termination
583+
SimpleBlas.copy(grad, dir)
584+
ndir = SimpleBlas.dot(dir, dir)
585+
} else {
586+
step = dstep
587+
}
502588
} else {
503-
Solve.solvePositive(fullXtX, userXy(index)).data
589+
ndir = SimpleBlas.dot(dir, dir)
590+
}
591+
592+
// terminate?
593+
if (stop(step, ndir, nx)) {
594+
return x.data
595+
}
596+
597+
// don't run through the walls
598+
if (nonnegative) {
599+
i = 0
600+
while (i < n) {
601+
if (step * dir.data(i) > x.data(i))
602+
step = Math.min(step, x.data(i) / dir.data(i))
603+
i = i + 1
604+
}
605+
}
606+
607+
// take the step
608+
i = 0
609+
while (i < n) {
610+
if (nonnegative) {
611+
if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) {
612+
x.data(i) = 0
613+
lastwall = iterno
614+
} else x.data(i) -= step * dir.data(i)
615+
} else {
616+
x.data(i) -= step * dir.data(i)
617+
}
618+
i = i + 1
504619
}
620+
621+
iterno = iterno + 1
622+
SimpleBlas.copy(dir, lastdir)
623+
lastnorm = ngrad
624+
}
625+
x.data
626+
}
627+
628+
/**
629+
* Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject
630+
* to nonnegativity constraints if `nonnegative` is true.
631+
*/
632+
def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix): Array[Double] = {
633+
if (!nonnegative) {
634+
Solve.solvePositive(ata, atb).data
635+
} else {
636+
NNLSbyPCG.solve(ata, atb, true)
505637
}
506638
}
507639

@@ -532,6 +664,34 @@ class ALS private (
532664
*/
533665
object ALS {
534666

667+
/**
668+
* Train a matrix factorization model given an RDD of ratings given by users to some products,
669+
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
670+
* product of two lower-rank matrices of a given rank (number of features). To solve for these
671+
* features, we run a given number of iterations of ALS. This is done using a level of
672+
* parallelism given by `blocks`, partitioning the data using the Partitioner `partitioner`.
673+
*
674+
* @param ratings RDD of (userID, productID, rating) pairs
675+
* @param rank number of features to use
676+
* @param iterations number of iterations of ALS (recommended: 10-20)
677+
* @param lambda regularization factor (recommended: 0.01)
678+
* @param blocks level of parallelism to split computation into
679+
* @param seed random seed
680+
* @param nonnegative Whether to impose nonnegativity constraints
681+
*/
682+
def train(
683+
ratings: RDD[Rating],
684+
rank: Int,
685+
iterations: Int,
686+
lambda: Double,
687+
blocks: Int,
688+
seed: Long,
689+
nonnegative: Boolean) = {
690+
val als = new ALS(blocks, rank, iterations, lambda, false, 1.0, seed)
691+
if (nonnegative) als.setNonnegative(true)
692+
als.run(ratings)
693+
}
694+
535695
/**
536696
* Train a matrix factorization model given an RDD of ratings given by users to some products,
537697
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
@@ -613,6 +773,37 @@ object ALS {
613773
train(ratings, rank, iterations, 0.01, -1)
614774
}
615775

776+
/**
777+
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
778+
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
779+
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
780+
* To solve for these features, we run a given number of iterations of ALS. This is done using
781+
* a level of parallelism given by `blocks`.
782+
*
783+
* @param ratings RDD of (userID, productID, rating) pairs
784+
* @param rank number of features to use
785+
* @param iterations number of iterations of ALS (recommended: 10-20)
786+
* @param lambda regularization factor (recommended: 0.01)
787+
* @param blocks level of parallelism to split computation into
788+
* @param alpha confidence parameter (only applies when immplicitPrefs = true)
789+
* @param seed random seed
790+
* @param nonnegative Whether to impose nonnegativity upon the user and product factors
791+
*/
792+
def trainImplicit(
793+
ratings: RDD[Rating],
794+
rank: Int,
795+
iterations: Int,
796+
lambda: Double,
797+
blocks: Int,
798+
alpha: Double,
799+
seed: Long,
800+
nonnegative: Boolean
801+
): MatrixFactorizationModel = {
802+
new ALS(blocks, rank, iterations, lambda, true, alpha, seed)
803+
.setNonnegative(nonnegative)
804+
.run(ratings)
805+
}
806+
616807
/**
617808
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
618809
* to some products, in the form of (userID, productID, preference) pairs. We approximate the

0 commit comments

Comments
 (0)