Skip to content

Commit 511f52f

Browse files
rxincloud-fan
authored andcommitted
[SPARK-16964][SQL] Remove private[sql] and private[spark] from sql.execution package
## What changes were proposed in this pull request? This package is meant to be internal, and as a result it does not make sense to mark things as private[sql] or private[spark]. It simply makes debugging harder when Spark developers need to inspect the plans at runtime. This patch removes all private[sql] and private[spark] visibility modifiers in org.apache.spark.sql.execution. ## How was this patch tested? N/A - just visibility changes. Author: Reynold Xin <[email protected]> Closes #14554 from rxin/remote-private.
1 parent 62e6212 commit 511f52f

File tree

63 files changed

+170
-177
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+170
-177
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
3131
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
3232

3333
/** Holds a cached logical plan and its data */
34-
private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
34+
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
3535

3636
/**
3737
* Provides support in a SQLContext for caching query results and automatically using these cached
@@ -41,7 +41,7 @@ private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMe
4141
*
4242
* Internal to Spark SQL.
4343
*/
44-
private[sql] class CacheManager extends Logging {
44+
class CacheManager extends Logging {
4545

4646
@transient
4747
private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData]
@@ -68,13 +68,13 @@ private[sql] class CacheManager extends Logging {
6868
}
6969

7070
/** Clears all cached tables. */
71-
private[sql] def clearCache(): Unit = writeLock {
71+
def clearCache(): Unit = writeLock {
7272
cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
7373
cachedData.clear()
7474
}
7575

7676
/** Checks if the cache is empty. */
77-
private[sql] def isEmpty: Boolean = readLock {
77+
def isEmpty: Boolean = readLock {
7878
cachedData.isEmpty
7979
}
8080

@@ -83,7 +83,7 @@ private[sql] class CacheManager extends Logging {
8383
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
8484
* recomputing the in-memory columnar representation of the underlying table is expensive.
8585
*/
86-
private[sql] def cacheQuery(
86+
def cacheQuery(
8787
query: Dataset[_],
8888
tableName: Option[String] = None,
8989
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
@@ -108,7 +108,7 @@ private[sql] class CacheManager extends Logging {
108108
* Tries to remove the data for the given [[Dataset]] from the cache.
109109
* No operation, if it's already uncached.
110110
*/
111-
private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock {
111+
def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock {
112112
val planToCache = query.queryExecution.analyzed
113113
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
114114
val found = dataIndex >= 0
@@ -120,17 +120,17 @@ private[sql] class CacheManager extends Logging {
120120
}
121121

122122
/** Optionally returns cached data for the given [[Dataset]] */
123-
private[sql] def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
123+
def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
124124
lookupCachedData(query.queryExecution.analyzed)
125125
}
126126

127127
/** Optionally returns cached data for the given [[LogicalPlan]]. */
128-
private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
128+
def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
129129
cachedData.find(cd => plan.sameResult(cd.plan))
130130
}
131131

132132
/** Replaces segments of the given logical plan with cached versions where possible. */
133-
private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = {
133+
def useCachedData(plan: LogicalPlan): LogicalPlan = {
134134
plan transformDown {
135135
case currentFragment =>
136136
lookupCachedData(currentFragment)
@@ -143,7 +143,7 @@ private[sql] class CacheManager extends Logging {
143143
* Invalidates the cache of any data that contains `plan`. Note that it is possible that this
144144
* function will over invalidate.
145145
*/
146-
private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock {
146+
def invalidateCache(plan: LogicalPlan): Unit = writeLock {
147147
cachedData.foreach {
148148
case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
149149
data.cachedRepresentation.recache()
@@ -155,7 +155,7 @@ private[sql] class CacheManager extends Logging {
155155
* Invalidates the cache of any data that contains `resourcePath` in one or more
156156
* `HadoopFsRelation` node(s) as part of its logical plan.
157157
*/
158-
private[sql] def invalidateCachedPath(
158+
def invalidateCachedPath(
159159
sparkSession: SparkSession, resourcePath: String): Unit = writeLock {
160160
val (fs, qualifiedPath) = {
161161
val path = new Path(resourcePath)

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter}
3838
import org.apache.spark.sql.types.{DataType, StructType}
3939
import org.apache.spark.util.Utils
4040

41-
private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
41+
trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
4242
val relation: BaseRelation
4343
val metastoreTableIdentifier: Option[TableIdentifier]
4444

@@ -48,7 +48,7 @@ private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
4848
}
4949

5050
/** Physical plan node for scanning data from a relation. */
51-
private[sql] case class RowDataSourceScanExec(
51+
case class RowDataSourceScanExec(
5252
output: Seq[Attribute],
5353
rdd: RDD[InternalRow],
5454
@transient relation: BaseRelation,
@@ -57,7 +57,7 @@ private[sql] case class RowDataSourceScanExec(
5757
override val metastoreTableIdentifier: Option[TableIdentifier])
5858
extends DataSourceScanExec {
5959

60-
private[sql] override lazy val metrics =
60+
override lazy val metrics =
6161
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
6262

6363
val outputUnsafeRows = relation match {
@@ -138,7 +138,7 @@ private[sql] case class RowDataSourceScanExec(
138138
* @param dataFilters Data source filters to use for filtering data within partitions.
139139
* @param metastoreTableIdentifier
140140
*/
141-
private[sql] case class FileSourceScanExec(
141+
case class FileSourceScanExec(
142142
@transient relation: HadoopFsRelation,
143143
output: Seq[Attribute],
144144
outputSchema: StructType,
@@ -211,7 +211,7 @@ private[sql] case class FileSourceScanExec(
211211
inputRDD :: Nil
212212
}
213213

214-
private[sql] override lazy val metrics =
214+
override lazy val metrics =
215215
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
216216
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
217217

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ object RDDConversions {
6767
}
6868
}
6969

70-
private[sql] object ExternalRDD {
70+
object ExternalRDD {
7171

7272
def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = {
7373
val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session)
@@ -76,7 +76,7 @@ private[sql] object ExternalRDD {
7676
}
7777

7878
/** Logical plan node for scanning data from an RDD. */
79-
private[sql] case class ExternalRDD[T](
79+
case class ExternalRDD[T](
8080
outputObjAttr: Attribute,
8181
rdd: RDD[T])(session: SparkSession)
8282
extends LeafNode with ObjectProducer with MultiInstanceRelation {
@@ -103,11 +103,11 @@ private[sql] case class ExternalRDD[T](
103103
}
104104

105105
/** Physical plan node for scanning data from an RDD. */
106-
private[sql] case class ExternalRDDScanExec[T](
106+
case class ExternalRDDScanExec[T](
107107
outputObjAttr: Attribute,
108108
rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec {
109109

110-
private[sql] override lazy val metrics = Map(
110+
override lazy val metrics = Map(
111111
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
112112

113113
protected override def doExecute(): RDD[InternalRow] = {
@@ -128,7 +128,7 @@ private[sql] case class ExternalRDDScanExec[T](
128128
}
129129

130130
/** Logical plan node for scanning data from an RDD of InternalRow. */
131-
private[sql] case class LogicalRDD(
131+
case class LogicalRDD(
132132
output: Seq[Attribute],
133133
rdd: RDD[InternalRow])(session: SparkSession)
134134
extends LeafNode with MultiInstanceRelation {
@@ -155,12 +155,12 @@ private[sql] case class LogicalRDD(
155155
}
156156

157157
/** Physical plan node for scanning data from an RDD of InternalRow. */
158-
private[sql] case class RDDScanExec(
158+
case class RDDScanExec(
159159
output: Seq[Attribute],
160160
rdd: RDD[InternalRow],
161161
override val nodeName: String) extends LeafExecNode {
162162

163-
private[sql] override lazy val metrics = Map(
163+
override lazy val metrics = Map(
164164
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
165165

166166
protected override def doExecute(): RDD[InternalRow] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ case class ExpandExec(
3939
child: SparkPlan)
4040
extends UnaryExecNode with CodegenSupport {
4141

42-
private[sql] override lazy val metrics = Map(
42+
override lazy val metrics = Map(
4343
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
4444

4545
// The GroupExpressions can output data with arbitrary partitioning, so set it

sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ package org.apache.spark.sql.execution
2222
* the list of paths that it returns will be returned to a user who calls `inputPaths` on any
2323
* DataFrame that queries this relation.
2424
*/
25-
private[sql] trait FileRelation {
25+
trait FileRelation {
2626
/** Returns the list of files that will be read when scanning this relation. */
2727
def inputFiles: Array[String]
2828
}

sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ case class GenerateExec(
5555
child: SparkPlan)
5656
extends UnaryExecNode {
5757

58-
private[sql] override lazy val metrics = Map(
58+
override lazy val metrics = Map(
5959
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
6060

6161
override def producedAttributes: AttributeSet = AttributeSet(output)

sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
2626
/**
2727
* Physical plan node for scanning data from a local collection.
2828
*/
29-
private[sql] case class LocalTableScanExec(
29+
case class LocalTableScanExec(
3030
output: Seq[Attribute],
3131
rows: Seq[InternalRow]) extends LeafExecNode {
3232

33-
private[sql] override lazy val metrics = Map(
33+
override lazy val metrics = Map(
3434
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
3535

3636
private val unsafeRows: Array[InternalRow] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.InternalRow
3030
* iterator to consume the next row, whereas RowIterator combines these calls into a single
3131
* [[advanceNext()]] method.
3232
*/
33-
private[sql] abstract class RowIterator {
33+
abstract class RowIterator {
3434
/**
3535
* Advance this iterator by a single row. Returns `false` if this iterator has no more rows
3636
* and `true` otherwise. If this returns `true`, then the new row can be retrieved by calling

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
2525
SparkListenerSQLExecutionStart}
2626

27-
private[sql] object SQLExecution {
27+
object SQLExecution {
2828

2929
val EXECUTION_ID_KEY = "spark.sql.execution.id"
3030

sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@ import org.apache.spark.executor.TaskMetrics
2222
import org.apache.spark.rdd.RDD
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.expressions._
25-
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, GenerateUnsafeProjection}
25+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
2626
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution}
2727
import org.apache.spark.sql.execution.metric.SQLMetrics
28-
import org.apache.spark.sql.types._
29-
import org.apache.spark.util.collection.unsafe.sort.RadixSort;
3028

3129
/**
3230
* Performs (external) sorting.
@@ -52,7 +50,7 @@ case class SortExec(
5250

5351
private val enableRadixSort = sqlContext.conf.enableRadixSort
5452

55-
override private[sql] lazy val metrics = Map(
53+
override lazy val metrics = Map(
5654
"sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"),
5755
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
5856
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))

0 commit comments

Comments
 (0)