Skip to content

Commit 67847e5

Browse files
committed
address comments from @zsxwing
1 parent 9caa2d5 commit 67847e5

32 files changed

+106
-65
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,16 +1119,12 @@ case class DecimalAggregates(conf: CatalystConf) extends Rule[LogicalPlan] {
11191119
*/
11201120
object ConvertToLocalRelation extends Rule[LogicalPlan] {
11211121
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1122-
case Project(projectList, lr @ LocalRelation(output, data))
1122+
case Project(projectList, lr @ LocalRelation(output, data, dataFromStreaming))
11231123
if !projectList.exists(hasUnevaluableExpr) =>
11241124
val projection = new InterpretedProjection(projectList, output)
11251125
projection.initialize(0)
1126-
if (lr.isStreaming) {
1127-
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
1128-
.setIncremental()
1129-
} else {
1130-
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
1131-
}
1126+
LocalRelation(projectList.map(_.toAttribute), data.map(projection),
1127+
dataFromStreaming = dataFromStreaming)
11321128
}
11331129

11341130
private def hasUnevaluableExpr(expr: Expression): Boolean = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,15 @@ object LocalRelation {
4343
}
4444
}
4545

46-
case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
46+
/**
47+
* @param dataFromStreaming indicate if this relation comes from a streaming source.
48+
* In a streaming query, stream relation will be cut into a
49+
* couple of batch relations.
50+
*/
51+
case class LocalRelation(
52+
output: Seq[Attribute],
53+
data: Seq[InternalRow] = Nil,
54+
var dataFromStreaming: Boolean = false)
4755
extends LeafNode with analysis.MultiInstanceRelation {
4856

4957
// A local relation must have resolved output.
@@ -68,12 +76,14 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
6876

6977
override def sameResult(plan: LogicalPlan): Boolean = {
7078
plan.canonicalized match {
71-
case LocalRelation(otherOutput, otherData) =>
79+
case LocalRelation(otherOutput, otherData, _) =>
7280
otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
7381
case _ => false
7482
}
7583
}
7684

85+
override def isStreaming: Boolean = dataFromStreaming
86+
7787
override def computeStats(conf: CatalystConf): Statistics =
7888
Statistics(sizeInBytes =
7989
output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
3131

3232
private var _analyzed: Boolean = false
3333

34-
private var _incremental: Boolean = false
35-
3634
/**
3735
* Marks this plan as already analyzed. This should only be called by CheckAnalysis.
3836
*/
@@ -45,10 +43,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
4543
*/
4644
def analyzed: Boolean = _analyzed
4745

48-
def setIncremental(): LogicalPlan = { _incremental = true ; this}
49-
5046
/** Returns true if this subtree contains any streaming data sources. */
51-
def isStreaming: Boolean = children.exists(_.isStreaming == true) || _incremental
47+
def isStreaming: Boolean = children.exists(_.isStreaming == true)
5248

5349
/**
5450
* Returns a copy of this node where `rule` has been recursively applied first to all of its

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,15 +348,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
348348
case (true, SaveMode.Overwrite) =>
349349
// Get all input data source or hive relations of the query.
350350
val srcRelations = df.logicalPlan.collect {
351-
case LogicalRelation(src: BaseRelation, _, _) => src
351+
case LogicalRelation(src: BaseRelation, _, _, _) => src
352352
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
353353
relation.tableMeta.identifier
354354
}
355355

356356
val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
357357
EliminateSubqueryAliases(tableRelation) match {
358358
// check if the table is a data source table (the relation is a BaseRelation).
359-
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
359+
case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) =>
360360
throw new AnalysisException(
361361
s"Cannot overwrite table $tableName that is also being read from")
362362
// check hive table relation when overwrite mode

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2728,7 +2728,7 @@ class Dataset[T] private[sql](
27282728
*/
27292729
def inputFiles: Array[String] = {
27302730
val files: Seq[String] = queryExecution.optimizedPlan.collect {
2731-
case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
2731+
case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) =>
27322732
fsBasedRelation.inputFiles
27332733
case fr: FileRelation =>
27342734
fr.inputFiles

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,18 @@ case class ExternalRDDScanExec[T](
127127
}
128128
}
129129

130-
/** Logical plan node for scanning data from an RDD of InternalRow. */
130+
/** Logical plan node for scanning data from an RDD of InternalRow.
131+
*
132+
* @param dataFromStreaming indicate if this relation comes from a streaming source.
133+
* In a streaming query, stream relation will be cut into a
134+
* couple of batch relations.
135+
*/
131136
case class LogicalRDD(
132137
output: Seq[Attribute],
133138
rdd: RDD[InternalRow],
134139
outputPartitioning: Partitioning = UnknownPartitioning(0),
135-
outputOrdering: Seq[SortOrder] = Nil)(session: SparkSession)
140+
outputOrdering: Seq[SortOrder] = Nil,
141+
var dataFromStreaming: Boolean = false)(session: SparkSession)
136142
extends LeafNode with MultiInstanceRelation {
137143

138144
override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil
@@ -163,11 +169,13 @@ case class LogicalRDD(
163169

164170
override def sameResult(plan: LogicalPlan): Boolean = {
165171
plan.canonicalized match {
166-
case LogicalRDD(_, otherRDD, _, _) => rdd.id == otherRDD.id
172+
case LogicalRDD(_, otherRDD, _, _, _) => rdd.id == otherRDD.id
167173
case _ => false
168174
}
169175
}
170176

177+
override def isStreaming: Boolean = dataFromStreaming
178+
171179
override protected def stringArgs: Iterator[Any] = Iterator(output)
172180

173181
@transient override def computeStats(conf: CatalystConf): Statistics = Statistics(

sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ case class OptimizeMetadataOnlyQuery(
9696
child transform {
9797
case plan if plan eq relation =>
9898
relation match {
99-
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
99+
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) =>
100100
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
101101
val partitionData = fsRelation.location.listFiles(filters = Nil)
102102
LocalRelation(partAttrs, partitionData.map(_.values))
@@ -132,7 +132,7 @@ case class OptimizeMetadataOnlyQuery(
132132
object PartitionedRelation {
133133

134134
def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match {
135-
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _)
135+
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
136136
if fsRelation.partitionSchema.nonEmpty =>
137137
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
138138
Some(AttributeSet(partAttrs), l)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
409409
execution.window.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
410410
case logical.Sample(lb, ub, withReplacement, seed, child) =>
411411
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
412-
case logical.LocalRelation(output, data) =>
412+
case logical.LocalRelation(output, data, _) =>
413413
LocalTableScanExec(output, data) :: Nil
414414
case logical.LocalLimit(IntegerLiteral(limit), child) =>
415415
execution.LocalLimitExec(limit, planLater(child)) :: Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ case class DataSource(
421421
}
422422
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
423423
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
424-
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
424+
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location
425425
}.head
426426
}
427427
// For partitioned relation r, r.schema's column ordering can be different from the column

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,12 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
140140
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
141141
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
142142

143-
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _),
143+
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
144144
parts, query, overwrite, false) if parts.isEmpty =>
145145
InsertIntoDataSourceCommand(l, query, overwrite)
146146

147147
case InsertIntoTable(
148-
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) =>
148+
l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, false) =>
149149
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
150150
// the user has specified static partitions, we add a Project operator on top of the query
151151
// to include those constant column values in the query result.
@@ -181,7 +181,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
181181

182182
val outputPath = t.location.rootPaths.head
183183
val inputPaths = actualQuery.collect {
184-
case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
184+
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
185185
}.flatten
186186

187187
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
@@ -264,29 +264,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
264264
*/
265265
object DataSourceStrategy extends Strategy with Logging {
266266
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
267-
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) =>
267+
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
268268
pruneFilterProjectRaw(
269269
l,
270270
projects,
271271
filters,
272272
(requestedColumns, allPredicates, _) =>
273273
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
274274

275-
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _)) =>
275+
case PhysicalOperation(
276+
projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _, _)) =>
276277
pruneFilterProject(
277278
l,
278279
projects,
279280
filters,
280281
(a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil
281282

282-
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _)) =>
283+
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) =>
283284
pruneFilterProject(
284285
l,
285286
projects,
286287
filters,
287288
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
288289

289-
case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
290+
case l @ LogicalRelation(baseRelation: TableScan, _, _, _) =>
290291
RowDataSourceScanExec(
291292
l.output,
292293
toCatalystRDD(l, baseRelation.buildScan()),

0 commit comments

Comments
 (0)