Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-942

Do not materialize partitions when DISK_ONLY storage level is used

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.8.0, 0.8.1, 0.9.0
    • 1.0.0
    • Block Manager, Spark Core
    • None

    Description

      If an operation returns a generating iterator (i.e. one that creates return values as the 'next' method is called), for example as the result of a 'flatMap' call on an RDD, the CacheManager first completely unrolls the iterator into an Array buffer before passing it to the blockManager (CacheManager.scala:74). Only after the entire iterator has been put into a buffer does it check if there is enough space in memory to store the data (BlockManager.scala:608).
      In the attached test, the code can complete the operation of 'saveAsTextFile' of text strings if it is called directly on the result RDD of a flatMap operation, this is because it is given an iterator result, and works on the map-then-save operation as the results are generated. In the other branch, a 'persist' is called, and the cacheManger first tries to un-roll the entire iterator before deciding to store it too disk, this will cause a Memory Error (on systems with -Xmx512m)

      In the cases where storing to disk is an option perhaps the CacheManager(or the BlockManager), can start to scan the iterator, calculating its size as the is pushed into a buffer as it goes (rather pushing everything into a buffer in a single operation), and if it determines that it will run out of memory, start pushing the already buffered portion of the iterator to disk, and then finish scanning the original iterator pushing that onto disk.

      Example Code (switch value of 'fail' variable to toggle behavior):

      MemTest.scala
      import org.apache.spark.SparkContext
      import org.apache.spark.storage.StorageLevel
      
      class Expander(base:String, count:Integer) extends Iterator[String] {
        var i = 0;
        def next() : String = {
          i += 1;
          return base + i.toString;
        }
        def hasNext() : Boolean = i < count;
      }
      
      object MemTest {
          def expand(s:String, i:Integer) : Iterator[String] = {
            return new Expander(s,i)
          }
          def main(args:Array[String]) = {
              val fail = false;
              val sc = new SparkContext("local", "mem_test");
              val seeds = sc.parallelize( Array(
                "This is the first sentence that we will test:",
                "This is the second sentence that we will test:",
                "This is the third sentence that we will test:"
              ) ); 
              val out = seeds.flatMap(expand(_,10000000));
              if (fail) {
                out.map(_ + "...").persist(StorageLevel.MEMORY_AND_DISK).saveAsTextFile("test.out")
              } else {
                out.map(_ + "...").saveAsTextFile("test.out")
              }
          }
      } 
      
      

      Attachments

        Issue Links

          Activity

            People

              kellrott Kyle Ellrott
              kellrott Kyle Ellrott
              Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: