Skip to content

feat: replace rdd.cache() by checkpoint / localCheckpoint in AggregateMessages #667

@SemyonSinchenko

Description

@SemyonSinchenko

Is your feature request related to a problem? Please describe.

This code can be (and should be) replaced by checkpoint. It should be handled in the same way like in other places: if config for using localCheckpoint is specified use it, otherwise use checkpoint

  /**
   * Create a new cached copy of a DataFrame. For iterative DataFrame-based algorithms.
   *
   * WARNING: This is NOT the same as `DataFrame.cache()`. The original DataFrame will NOT be
   * cached.
   *
   * This is a workaround for SPARK-13346, which makes it difficult to use DataFrames in iterative
   * algorithms. This workaround converts the DataFrame to an RDD, caches the RDD, and creates a
   * new DataFrame. This is important for avoiding the creation of extremely complex DataFrame
   * query plans when using DataFrames in iterative algorithms.
   */
  def getCachedDataFrame(df: DataFrame): DataFrame = {
    val rdd = df.rdd.cache()
    // rdd.count()
    df.sparkSession.createDataFrame(rdd, df.schema)
  }

Describe the solution you would like
.

Component

  • Scala Core Internal
  • Scala API
  • Spark Connect Plugin
  • Infrastructure
  • PySpark Classic
  • PySpark Connect

Additional context
Performance + code unification

Are you planning on creating a PR?

  • I'm willing to make a pull-request

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions