@@ -30,13 +30,25 @@ import org.apache.spark.storage.StorageLevel
3030 *
3131 * @param numRowBlocks Number of blocks that form the rows of the matrix.
3232 * @param numColBlocks Number of blocks that form the columns of the matrix.
33+ * @param suggestedNumPartitions Number of partitions to partition the rdd into. The final number
34+ * of partitions will be set to `min(suggestedNumPartitions,
35+ * numRowBlocks * numColBlocks)`, because setting the number of
36+ * partitions greater than the number of sub matrices is not useful.
3337 */
3438private [mllib] class GridPartitioner (
3539 val numRowBlocks : Int ,
3640 val numColBlocks : Int ,
37- val numParts : Int ) extends Partitioner {
41+ suggestedNumPartitions : Int ) extends Partitioner {
3842 // Having the number of partitions greater than the number of sub matrices does not help
39- override val numPartitions = math.min(numParts, numRowBlocks * numColBlocks)
43+ override val numPartitions = math.min(suggestedNumPartitions, numRowBlocks * numColBlocks)
44+
45+ val totalBlocks = numRowBlocks.toLong * numColBlocks
46+ // Gives the number of blocks that need to be in each partition
47+ val targetNumBlocksPerPartition = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
48+ // Number of neighboring blocks to take in each row
49+ val numRowBlocksPerPartition = math.ceil(numRowBlocks * 1.0 / targetNumBlocksPerPartition).toInt
50+ // Number of neighboring blocks to take in each column
51+ val numColBlocksPerPartition = math.ceil(numColBlocks * 1.0 / targetNumBlocksPerPartition).toInt
4052
4153 /**
4254 * Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise
@@ -51,27 +63,20 @@ private[mllib] class GridPartitioner(
5163 override def getPartition (key : Any ): Int = {
5264 key match {
5365 case (blockRowIndex : Int , blockColIndex : Int ) =>
54- getBlockId (blockRowIndex, blockColIndex)
66+ getPartitionId (blockRowIndex, blockColIndex)
5567 case (blockRowIndex : Int , innerIndex : Int , blockColIndex : Int ) =>
56- getBlockId (blockRowIndex, blockColIndex)
68+ getPartitionId (blockRowIndex, blockColIndex)
5769 case _ =>
5870 throw new IllegalArgumentException (s " Unrecognized key. key: $key" )
5971 }
6072 }
6173
6274 /** Partitions sub-matrices as blocks with neighboring sub-matrices. */
63- private def getBlockId (blockRowIndex : Int , blockColIndex : Int ): Int = {
64- val totalBlocks = numRowBlocks * numColBlocks
65- // Gives the number of blocks that need to be in each partition
66- val partitionRatio = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
67- // Number of neighboring blocks to take in each row
68- val subBlocksPerRow = math.ceil(numRowBlocks * 1.0 / partitionRatio).toInt
69- // Number of neighboring blocks to take in each column
70- val subBlocksPerCol = math.ceil(numColBlocks * 1.0 / partitionRatio).toInt
75+ private def getPartitionId (blockRowIndex : Int , blockColIndex : Int ): Int = {
7176 // Coordinates of the block
72- val i = blockRowIndex / subBlocksPerRow
73- val j = blockColIndex / subBlocksPerCol
74- val blocksPerRow = math.ceil(numRowBlocks * 1.0 / subBlocksPerRow ).toInt
77+ val i = blockRowIndex / numRowBlocksPerPartition
78+ val j = blockColIndex / numColBlocksPerPartition
79+ val blocksPerRow = math.ceil(numRowBlocks * 1.0 / numRowBlocksPerPartition ).toInt
7580 j * blocksPerRow + i
7681 }
7782
@@ -91,10 +96,10 @@ private[mllib] class GridPartitioner(
9196 * Represents a distributed matrix in blocks of local matrices.
9297 *
9398 * @param rdd The RDD of SubMatrices (local matrices) that form this matrix
94- * @param nRows Number of rows of this matrix
95- * @param nCols Number of columns of this matrix
96- * @param numRowBlocks Number of blocks that form the rows of this matrix
97- * @param numColBlocks Number of blocks that form the columns of this matrix
99+ * @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
100+ * the number of rows will be calculated when `numRows` is invoked.
101+ * @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
102+ * zero, the number of columns will be calculated when `numCols` is invoked.
98103 * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
99104 * rows are not required to have the given number of rows
100105 * @param colsPerBlock Number of columns that make up each block. The blocks forming the final
@@ -104,8 +109,6 @@ class BlockMatrix(
104109 val rdd : RDD [((Int , Int ), Matrix )],
105110 private var nRows : Long ,
106111 private var nCols : Long ,
107- val numRowBlocks : Int ,
108- val numColBlocks : Int ,
109112 val rowsPerBlock : Int ,
110113 val colsPerBlock : Int ) extends DistributedMatrix with Logging {
111114
@@ -115,25 +118,18 @@ class BlockMatrix(
115118 * Alternate constructor for BlockMatrix without the input of the number of rows and columns.
116119 *
117120 * @param rdd The RDD of SubMatrices (local matrices) that form this matrix
118- * @param numRowBlocks Number of blocks that form the rows of this matrix
119- * @param numColBlocks Number of blocks that form the columns of this matrix
120121 * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
121122 * rows are not required to have the given number of rows
122123 * @param colsPerBlock Number of columns that make up each block. The blocks forming the final
123124 * columns are not required to have the given number of columns
124125 */
125126 def this (
126127 rdd : RDD [((Int , Int ), Matrix )],
127- numRowBlocks : Int ,
128- numColBlocks : Int ,
129128 rowsPerBlock : Int ,
130129 colsPerBlock : Int ) = {
131- this (rdd, 0L , 0L , numRowBlocks, numColBlocks, rowsPerBlock, colsPerBlock)
130+ this (rdd, 0L , 0L , rowsPerBlock, colsPerBlock)
132131 }
133132
134- private [mllib] var partitioner : GridPartitioner =
135- new GridPartitioner (numRowBlocks, numColBlocks, rdd.partitions.length)
136-
137133 private lazy val dims : (Long , Long ) = getDim
138134
139135 override def numRows (): Long = {
@@ -146,48 +142,21 @@ class BlockMatrix(
146142 nCols
147143 }
148144
145+ val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt
146+ val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt
147+
148+ private [mllib] var partitioner : GridPartitioner =
149+ new GridPartitioner (numRowBlocks, numColBlocks, rdd.partitions.length)
150+
151+
152+
149153 /** Returns the dimensions of the matrix. */
150154 private def getDim : (Long , Long ) = {
151- case class MatrixMetaData (var rowIndex : Int , var colIndex : Int ,
152- var numRows : Int , var numCols : Int )
153- // picks the sizes of the matrix with the maximum indices
154- def pickSizeByGreaterIndex (example : MatrixMetaData , base : MatrixMetaData ): MatrixMetaData = {
155- if (example.rowIndex > base.rowIndex) {
156- base.rowIndex = example.rowIndex
157- base.numRows = example.numRows
158- }
159- if (example.colIndex > base.colIndex) {
160- base.colIndex = example.colIndex
161- base.numCols = example.numCols
162- }
163- base
164- }
165-
166- // Aggregate will return an error if the rdd is empty
167- val lastRowCol = rdd.treeAggregate(new MatrixMetaData (0 , 0 , 0 , 0 ))(
168- seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) =>
169- pickSizeByGreaterIndex(
170- new MatrixMetaData (blockXInd, blockYInd, mat.numRows, mat.numCols), base)
171- },
172- combOp = (c1, c2) => (c1, c2) match {
173- case (res1, res2) =>
174- pickSizeByGreaterIndex(res1, res2)
175- })
176- // We add the size of the edge matrices, because they can be less than the specified
177- // rowsPerBlock or colsPerBlock.
178- (lastRowCol.rowIndex.toLong * rowsPerBlock + lastRowCol.numRows,
179- lastRowCol.colIndex.toLong * colsPerBlock + lastRowCol.numCols)
180- }
155+ val (rows, cols) = rdd.map { case ((blockRowIndex, blockColIndex), mat) =>
156+ (blockRowIndex * rowsPerBlock + mat.numRows, blockColIndex * colsPerBlock + mat.numCols)
157+ }.reduce((x0, x1) => (math.max(x0._1, x1._1), math.max(x0._2, x1._2)))
181158
182- /** Returns the Frobenius Norm of the matrix */
183- def normFro (): Double = {
184- math.sqrt(rdd.map { mat => mat._2 match {
185- case sparse : SparseMatrix =>
186- sparse.values.map(x => math.pow(x, 2 )).sum
187- case dense : DenseMatrix =>
188- dense.values.map(x => math.pow(x, 2 )).sum
189- }
190- }.reduce(_ + _))
159+ (math.max(rows, nRows), math.max(cols, nCols))
191160 }
192161
193162 /** Cache the underlying RDD. */
@@ -210,14 +179,14 @@ class BlockMatrix(
210179 s " Int.MaxValue. Currently numCols: ${numCols()}" )
211180 val nRows = numRows().toInt
212181 val nCols = numCols().toInt
213- val mem = nRows * nCols * 8 / 1000000
182+ val mem = nRows.toLong * nCols / 125000
214183 if (mem > 500 ) logWarning(s " Storing this matrix will require $mem MB of memory! " )
215184
216- val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1))
185+ val parts = rdd.collect()
217186 val values = new Array [Double ](nRows * nCols)
218- parts.foreach { case ((rowIndex, colIndex ), block) =>
219- val rowOffset = rowIndex * rowsPerBlock
220- val colOffset = colIndex * colsPerBlock
187+ parts.foreach { case ((blockRowIndex, blockColIndex ), block) =>
188+ val rowOffset = blockRowIndex * rowsPerBlock
189+ val colOffset = blockColIndex * colsPerBlock
221190 var j = 0
222191 val mat = block.toArray
223192 while (j < block.numCols) {
0 commit comments