Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.8.0, 0.8.1, 0.9.0
-
None
Description
If an operation returns a generating iterator (i.e. one that creates return values as the 'next' method is called), for example as the result of a 'flatMap' call on an RDD, the CacheManager first completely unrolls the iterator into an Array buffer before passing it to the blockManager (CacheManager.scala:74). Only after the entire iterator has been put into a buffer does it check if there is enough space in memory to store the data (BlockManager.scala:608).
In the attached test, the code can complete the operation of 'saveAsTextFile' of text strings if it is called directly on the result RDD of a flatMap operation, this is because it is given an iterator result, and works on the map-then-save operation as the results are generated. In the other branch, a 'persist' is called, and the cacheManger first tries to un-roll the entire iterator before deciding to store it too disk, this will cause a Memory Error (on systems with -Xmx512m)
In the cases where storing to disk is an option perhaps the CacheManager(or the BlockManager), can start to scan the iterator, calculating its size as the is pushed into a buffer as it goes (rather pushing everything into a buffer in a single operation), and if it determines that it will run out of memory, start pushing the already buffered portion of the iterator to disk, and then finish scanning the original iterator pushing that onto disk.
Example Code (switch value of 'fail' variable to toggle behavior):
import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel class Expander(base:String, count:Integer) extends Iterator[String] { var i = 0; def next() : String = { i += 1; return base + i.toString; } def hasNext() : Boolean = i < count; } object MemTest { def expand(s:String, i:Integer) : Iterator[String] = { return new Expander(s,i) } def main(args:Array[String]) = { val fail = false; val sc = new SparkContext("local", "mem_test"); val seeds = sc.parallelize( Array( "This is the first sentence that we will test:", "This is the second sentence that we will test:", "This is the third sentence that we will test:" ) ); val out = seeds.flatMap(expand(_,10000000)); if (fail) { out.map(_ + "...").persist(StorageLevel.MEMORY_AND_DISK).saveAsTextFile("test.out") } else { out.map(_ + "...").saveAsTextFile("test.out") } } }
Attachments
Issue Links
- is related to
-
SPARK-1201 Do not materialize partitions whenever possible in BlockManager
-
- Resolved
-