Skip to content

Commit f049fa1

Browse files
committed
address comment
1 parent a0ab11d commit f049fa1

File tree

8 files changed

+144
-124
lines changed

8 files changed

+144
-124
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -846,10 +846,9 @@ class Analyzer(override val catalogManager: CatalogManager)
846846
object ResolveTempViews extends Rule[LogicalPlan] {
847847
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
848848
case u @ UnresolvedRelation(ident, _, isStreaming) =>
849-
lookupTempView(ident, isStreaming).map(ResolveRelations.resolveViews).getOrElse(u)
849+
lookupTempView(ident, isStreaming).getOrElse(u)
850850
case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _, _) =>
851851
lookupTempView(ident)
852-
.map(ResolveRelations.resolveViews)
853852
.map(view => i.copy(table = view))
854853
.getOrElse(i)
855854
// TODO (SPARK-27484): handle streaming write commands when we have them.
@@ -895,7 +894,7 @@ class Analyzer(override val catalogManager: CatalogManager)
895894
throw new AnalysisException(s"${identifier.quoted} is not a temp view of streaming " +
896895
s"logical plan, please use batch API such as `DataFrameReader.table` to read it.")
897896
}
898-
tmpView
897+
tmpView.map(ResolveRelations.resolveViews)
899898
}
900899
}
901900

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ class SessionCatalog(
606606
* Return a local temporary view exactly as it was stored.
607607
*/
608608
def getTempView(name: String): Option[LogicalPlan] = synchronized {
609-
tempViews.get(formatTableName(name)).map(parseTempViewPlan)
609+
tempViews.get(formatTableName(name)).map(getTempViewPlan)
610610
}
611611

612612
def getTempViewNames(): Seq[String] = synchronized {
@@ -617,7 +617,7 @@ class SessionCatalog(
617617
* Return a global temporary view exactly as it was stored.
618618
*/
619619
def getGlobalTempView(name: String): Option[LogicalPlan] = {
620-
globalTempViewManager.get(formatTableName(name)).map(parseTempViewPlan)
620+
globalTempViewManager.get(formatTableName(name)).map(getTempViewPlan)
621621
}
622622

623623
/**
@@ -781,13 +781,13 @@ class SessionCatalog(
781781
val table = formatTableName(name.table)
782782
if (db == globalTempViewManager.database) {
783783
globalTempViewManager.get(table).map { viewDef =>
784-
SubqueryAlias(table, db, parseTempViewPlan(viewDef))
784+
SubqueryAlias(table, db, getTempViewPlan(viewDef))
785785
}.getOrElse(throw new NoSuchTableException(db, table))
786786
} else if (name.database.isDefined || !tempViews.contains(table)) {
787787
val metadata = externalCatalog.getTable(db, table)
788788
getRelation(metadata)
789789
} else {
790-
SubqueryAlias(table, parseTempViewPlan(tempViews(table)))
790+
SubqueryAlias(table, getTempViewPlan(tempViews(table)))
791791
}
792792
}
793793
}
@@ -801,43 +801,20 @@ class SessionCatalog(
801801
val multiParts = Seq(CatalogManager.SESSION_CATALOG_NAME, db, table)
802802

803803
if (metadata.tableType == CatalogTableType.VIEW) {
804-
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
805-
val viewConfigs = metadata.viewSQLConfigs
806-
val viewPlan =
807-
SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView = false)) {
808-
parser.parsePlan(viewText)
809-
}
810-
811-
logDebug(s"'$viewText' will be used for the view($table) with configs: $viewConfigs.")
812804
// The relation is a view, so we wrap the relation by:
813805
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
814806
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
815-
val child = View(
816-
desc = metadata,
817-
isTempView = false,
818-
output = metadata.schema.toAttributes,
819-
child = viewPlan)
807+
val child = View.fromCatalogTable(metadata, isTempView = false, parser)
820808
SubqueryAlias(multiParts, child)
821809
} else {
822810
SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata, options))
823811
}
824812
}
825813

826-
def parseTempViewPlan(plan: LogicalPlan): LogicalPlan = {
814+
def getTempViewPlan(plan: LogicalPlan): LogicalPlan = {
827815
plan match {
828816
case viewInfo: TemporaryViewRelation =>
829-
val metadata = viewInfo.tableMeta
830-
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
831-
val viewConfigs = metadata.viewSQLConfigs
832-
val viewPlan =
833-
SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView = true)) {
834-
parser.parsePlan(viewText)
835-
}
836-
View(
837-
desc = metadata,
838-
isTempView = true,
839-
output = metadata.schema.toAttributes,
840-
child = viewPlan)
817+
View.fromCatalogTable(viewInfo.tableMeta, isTempView = true, parser)
841818
case v => v
842819
}
843820
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ case class UnresolvedCatalogRelation(
669669

670670
/**
671671
* A wrapper to store the temporary view info, will be kept in `SessionCatalog`
672-
* and will be transformed to `View` in `getTempView`
672+
* and will be transformed to `View` during analysis
673673
*/
674674
case class TemporaryViewRelation(tableMeta: CatalogTable) extends LeafNode {
675675
override lazy val resolved: Boolean = false

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2222
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
25+
import org.apache.spark.sql.catalyst.parser.ParserInterface
2526
import org.apache.spark.sql.catalyst.plans._
2627
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
2728
import org.apache.spark.sql.catalyst.util.truncatedString
@@ -469,6 +470,21 @@ object View {
469470
sqlConf.setConf(SQLConf.MAX_NESTED_VIEW_DEPTH, activeConf.maxNestedViewDepth)
470471
sqlConf
471472
}
473+
474+
def fromCatalogTable(
475+
metadata: CatalogTable, isTempView: Boolean, parser: ParserInterface): View = {
476+
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
477+
val viewConfigs = metadata.viewSQLConfigs
478+
val viewPlan =
479+
SQLConf.withExistingConf(effectiveSQLConf(viewConfigs, isTempView = isTempView)) {
480+
parser.parsePlan(viewText)
481+
}
482+
View(
483+
desc = metadata,
484+
isTempView = isTempView,
485+
output = metadata.schema.toAttributes,
486+
child = viewPlan)
487+
}
472488
}
473489

474490
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1492,7 +1492,7 @@ object SQLConf {
14921492
val STORE_ANALYZED_PLAN_FOR_VIEW =
14931493
buildConf("spark.sql.legacy.storeAnalyzedPlanForView")
14941494
.internal()
1495-
.doc("When true, analyzed plan instead of sql text will be stored when creating " +
1495+
.doc("When true, analyzed plan instead of SQL text will be stored when creating " +
14961496
"temporary view")
14971497
.version("3.1.0")
14981498
.booleanConf

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,14 @@ case class CreateViewCommand(
110110
verifyTemporaryObjectsNotExists(catalog)
111111

112112
if (viewType == LocalTempView) {
113-
if (replace && catalog.getTempView(name.table).isDefined &&
114-
!catalog.getTempView(name.table).get.sameResult(child)) {
113+
val samePlan = catalog.getTempView(name.table).exists {
114+
// Don't perform sameResult check for View logical plan, since it's unresolved
115+
case _: View => false
116+
case other => other.sameResult(child)
117+
}
118+
if (replace && !samePlan) {
115119
logInfo(s"Try to uncache ${name.quotedString} before replacing.")
120+
checkCyclicViewReference(analyzedPlan, Seq(name), name)
116121
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
117122
}
118123
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
@@ -126,16 +131,21 @@ case class CreateViewCommand(
126131
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
127132
} else if (viewType == GlobalTempView) {
128133
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
129-
val globalTempView = TableIdentifier(name.table, Option(db))
130-
if (replace && catalog.getGlobalTempView(name.table).isDefined &&
131-
!catalog.getGlobalTempView(name.table).get.sameResult(child)) {
132-
logInfo(s"Try to uncache ${globalTempView.quotedString} before replacing.")
133-
CommandUtils.uncacheTableOrView(sparkSession, globalTempView.quotedString)
134+
val viewIdent = TableIdentifier(name.table, Option(db))
135+
val samePlan = catalog.getGlobalTempView(name.table).exists {
136+
// Don't perform sameResult check for View logical plan, since it's unresolved
137+
case _: View => false
138+
case other => other.sameResult(child)
139+
}
140+
if (replace && !samePlan) {
141+
logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.")
142+
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
143+
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
134144
}
135145
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
136146
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
137147
TemporaryViewRelation(
138-
prepareTemporaryView(globalTempView, sparkSession, analyzedPlan, aliasedPlan.schema))
148+
prepareTemporaryView(viewIdent, sparkSession, analyzedPlan, aliasedPlan.schema))
139149
} else {
140150
aliasedPlan
141151
}
@@ -261,18 +271,18 @@ case class CreateViewCommand(
261271
viewName: TableIdentifier,
262272
session: SparkSession,
263273
analyzedPlan: LogicalPlan,
264-
aliasedSchema: StructType): CatalogTable = {
274+
viewSchema: StructType): CatalogTable = {
265275

266276
// TBLPROPERTIES is not allowed for temporary view, so we don't use it for
267277
// generating temporary view properties
268278
val newProperties = generateViewProperties(
269-
Map.empty, session, analyzedPlan, aliasedSchema.fieldNames)
279+
Map.empty, session, analyzedPlan, viewSchema.fieldNames)
270280

271281
CatalogTable(
272282
identifier = viewName,
273283
tableType = CatalogTableType.VIEW,
274284
storage = CatalogStorageFormat.empty,
275-
schema = aliasedSchema,
285+
schema = viewSchema,
276286
viewText = originalText,
277287
properties = newProperties)
278288
}
@@ -317,6 +327,8 @@ case class AlterViewAsCommand(
317327
val tableDefinition = if (conf.storeAnalyzedPlanForView) {
318328
analyzedPlan
319329
} else {
330+
checkCyclicViewReference(analyzedPlan, Seq(name), name)
331+
320332
val properties = generateViewProperties(
321333
Map.empty, session, analyzedPlan, analyzedPlan.schema.fieldNames)
322334

sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,11 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
784784
withView("v1") {
785785
withSQLConf(STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") {
786786
sql("CREATE TEMPORARY VIEW v1 AS SELECT * FROM t")
787+
Seq(4, 6, 5).toDF("c1").write.mode("overwrite").format("parquet").saveAsTable("t")
788+
val e = intercept[SparkException] {
789+
sql("SELECT * FROM v1").collect()
790+
}.getMessage
791+
assert(e.contains("does not exist"))
787792
}
788793

789794
withSQLConf(STORE_ANALYZED_PLAN_FOR_VIEW.key -> "false") {

0 commit comments

Comments
 (0)