@@ -32,6 +32,8 @@ import org.apache.spark.storage.StorageLevel
3232import org .apache .spark .rdd .RDD
3333import org .apache .spark .serializer .KryoRegistrator
3434import org .apache .spark .SparkContext ._
35+ import org .apache .spark .util .Utils
36+ import org .apache .spark .mllib .optimization .NNLSbyPCG
3537
3638/**
3739 * Out-link information for a user or product block. This includes the original user/product IDs
@@ -156,6 +158,18 @@ class ALS private (
156158 this
157159 }
158160
161+ /** If true, do alternating nonnegative least squares. */
162+ private var nonnegative = false
163+
164+ /**
165+ * Set whether the least-squares problems solved at each iteration should have
166+ * nonnegativity constraints.
167+ */
168+ def setNonnegative (b : Boolean ): ALS = {
169+ this .nonnegative = b
170+ this
171+ }
172+
159173 /**
160174 * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
161175 * Returns a MatrixFactorizationModel with feature vectors for each user and product.
@@ -498,10 +512,128 @@ class ALS private (
498512 (0 until rank).foreach(i => fullXtX.data(i* rank + i) += lambda)
499513 // Solve the resulting matrix, which is symmetric and positive-definite
500514 if (implicitPrefs) {
501- Solve .solvePositive(fullXtX.addi(YtY .get.value), userXy(index)).data
515+ solveLeastSquares(fullXtX.addi(YtY .get.value), userXy(index))
516+ } else {
517+ solveLeastSquares(fullXtX, userXy(index))
518+ }
519+ }
520+ }
521+
522+ /**
523+ * Solve a least squares problem, possibly with nonnegativity constraints, by a modified
524+ * projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b.
525+ */
526+ def solveLSbyPCG (ata : DoubleMatrix , atb : DoubleMatrix , nonnegative : Boolean ): Array [Double ] = {
527+ val n = atb.rows
528+ val scratch = new DoubleMatrix (n, 1 )
529+
530+ // find the optimal unconstrained step
531+ def steplen (dir : DoubleMatrix , resid : DoubleMatrix ): Double = {
532+ val top = SimpleBlas .dot(dir, resid)
533+ SimpleBlas .gemv(1.0 , ata, dir, 0.0 , scratch)
534+ top / SimpleBlas .dot(scratch, dir)
535+ }
536+
537+ // stopping condition
538+ def stop (step : Double , ndir : Double , nx : Double ): Boolean = {
539+ ((step != step)
540+ || (step < 1e-6 )
541+ || (ndir < 1e-12 * nx))
542+ }
543+
544+ val grad = new DoubleMatrix (n, 1 )
545+ val x = new DoubleMatrix (n, 1 )
546+ val dir = new DoubleMatrix (n, 1 )
547+ val lastdir = new DoubleMatrix (n, 1 )
548+ val resid = new DoubleMatrix (n, 1 )
549+ var lastnorm = 0.0
550+ var iterno = 0
551+ var lastwall = 0
552+ var i = 0
553+ while (iterno < 40000 ) {
554+ // find the residual
555+ SimpleBlas .gemv(1.0 , ata, x, 0.0 , resid)
556+ SimpleBlas .axpy(- 1.0 , atb, resid)
557+ SimpleBlas .copy(resid, grad)
558+
559+ // project the gradient
560+ if (nonnegative) {
561+ i = 0
562+ while (i < n) {
563+ if (grad.data(i) > 0.0 && x.data(i) == 0.0 )
564+ grad.data(i) = 0.0
565+ i = i + 1
566+ }
567+ }
568+ val ngrad = SimpleBlas .dot(grad, grad)
569+
570+ SimpleBlas .copy(grad, dir)
571+
572+ // use a CG direction under certain conditions
573+ var step = steplen(grad, resid)
574+ var ndir = 0.0
575+ val nx = SimpleBlas .dot(x, x)
576+ if (iterno > lastwall + 1 ) {
577+ val alpha = ngrad / lastnorm
578+ SimpleBlas .axpy(alpha, lastdir, dir)
579+ val dstep = steplen(dir, resid)
580+ ndir = SimpleBlas .dot(dir, dir)
581+ if (stop(dstep, ndir, nx)) {
582+ // reject the CG step if it could lead to premature termination
583+ SimpleBlas .copy(grad, dir)
584+ ndir = SimpleBlas .dot(dir, dir)
585+ } else {
586+ step = dstep
587+ }
502588 } else {
503- Solve .solvePositive(fullXtX, userXy(index)).data
589+ ndir = SimpleBlas .dot(dir, dir)
590+ }
591+
592+ // terminate?
593+ if (stop(step, ndir, nx)) {
594+ return x.data
595+ }
596+
597+ // don't run through the walls
598+ if (nonnegative) {
599+ i = 0
600+ while (i < n) {
601+ if (step * dir.data(i) > x.data(i))
602+ step = Math .min(step, x.data(i) / dir.data(i))
603+ i = i + 1
604+ }
605+ }
606+
607+ // take the step
608+ i = 0
609+ while (i < n) {
610+ if (nonnegative) {
611+ if (step * dir.data(i) > x.data(i) * (1 - 1e-14 )) {
612+ x.data(i) = 0
613+ lastwall = iterno
614+ } else x.data(i) -= step * dir.data(i)
615+ } else {
616+ x.data(i) -= step * dir.data(i)
617+ }
618+ i = i + 1
504619 }
620+
621+ iterno = iterno + 1
622+ SimpleBlas .copy(dir, lastdir)
623+ lastnorm = ngrad
624+ }
625+ x.data
626+ }
627+
628+ /**
629+ * Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject
630+ * to nonnegativity constraints if `nonnegative` is true.
631+ */
632+ def solveLeastSquares (ata : DoubleMatrix , atb : DoubleMatrix ): Array [Double ] = {
633+ if (! nonnegative) {
634+ Solve .solvePositive(ata, atb).data
635+ } else {
636+ NNLSbyPCG .solve(ata, atb, true )
505637 }
506638 }
507639
@@ -532,6 +664,34 @@ class ALS private (
532664 */
533665object ALS {
534666
667+ /**
668+ * Train a matrix factorization model given an RDD of ratings given by users to some products,
669+ * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
670+ * product of two lower-rank matrices of a given rank (number of features). To solve for these
671+ * features, we run a given number of iterations of ALS. This is done using a level of
672+ * parallelism given by `blocks`, partitioning the data using the Partitioner `partitioner`.
673+ *
674+ * @param ratings RDD of (userID, productID, rating) pairs
675+ * @param rank number of features to use
676+ * @param iterations number of iterations of ALS (recommended: 10-20)
677+ * @param lambda regularization factor (recommended: 0.01)
678+ * @param blocks level of parallelism to split computation into
679+ * @param seed random seed
680+ * @param nonnegative Whether to impose nonnegativity constraints
681+ */
682+ def train (
683+ ratings : RDD [Rating ],
684+ rank : Int ,
685+ iterations : Int ,
686+ lambda : Double ,
687+ blocks : Int ,
688+ seed : Long ,
689+ nonnegative : Boolean ) = {
690+ val als = new ALS (blocks, rank, iterations, lambda, false , 1.0 , seed)
691+ if (nonnegative) als.setNonnegative(true )
692+ als.run(ratings)
693+ }
694+
535695 /**
536696 * Train a matrix factorization model given an RDD of ratings given by users to some products,
537697 * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
@@ -613,6 +773,37 @@ object ALS {
613773 train(ratings, rank, iterations, 0.01 , - 1 )
614774 }
615775
776+ /**
777+ * Train a matrix factorization model given an RDD of 'implicit preferences' given by users
778+ * to some products, in the form of (userID, productID, preference) pairs. We approximate the
779+ * ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
780+ * To solve for these features, we run a given number of iterations of ALS. This is done using
781+ * a level of parallelism given by `blocks`.
782+ *
783+ * @param ratings RDD of (userID, productID, rating) pairs
784+ * @param rank number of features to use
785+ * @param iterations number of iterations of ALS (recommended: 10-20)
786+ * @param lambda regularization factor (recommended: 0.01)
787+ * @param blocks level of parallelism to split computation into
788+ * @param alpha confidence parameter (only applies when immplicitPrefs = true)
789+ * @param seed random seed
790+ * @param nonnegative Whether to impose nonnegativity upon the user and product factors
791+ */
792+ def trainImplicit (
793+ ratings : RDD [Rating ],
794+ rank : Int ,
795+ iterations : Int ,
796+ lambda : Double ,
797+ blocks : Int ,
798+ alpha : Double ,
799+ seed : Long ,
800+ nonnegative : Boolean
801+ ): MatrixFactorizationModel = {
802+ new ALS (blocks, rank, iterations, lambda, true , alpha, seed)
803+ .setNonnegative(nonnegative)
804+ .run(ratings)
805+ }
806+
616807 /**
617808 * Train a matrix factorization model given an RDD of 'implicit preferences' given by users
618809 * to some products, in the form of (userID, productID, preference) pairs. We approximate the
0 commit comments