Skip to content

Commit b7c8210

Browse files
linhongliu-dbcloud-fan
authored andcommitted
[SPARK-33142][SPARK-33647][SQL][FOLLOW-UP] Add docs and test cases
### What changes were proposed in this pull request? Addressed comments in PR #30567, including: 1. add test case for SPARK-33647 and SPARK-33142 2. add migration guide 3. add `getRawTempView` and `getRawGlobalTempView` to return the raw view info (i.e. TemporaryViewRelation) 4. other minor code clean ### Why are the changes needed? Code clean and more test cases ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing and newly added test cases Closes #30666 from linhongliu-db/SPARK-33142-followup. Lead-authored-by: Linhong Liu <[email protected]> Co-authored-by: Linhong Liu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent e7fe92f commit b7c8210

File tree

7 files changed

+79
-52
lines changed

7 files changed

+79
-52
lines changed

docs/sql-migration-guide.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ license: |
5858

5959
- In Spark 3.1, refreshing a table will trigger an uncache operation for all other caches that reference the table, even if the table itself is not cached. In Spark 3.0 the operation will only be triggered if the table itself is cached.
6060

61-
- In Spark 3.1, creating or altering a view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`.
61+
- In Spark 3.1, creating or altering a permanent view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`.
62+
63+
- In Spark 3.1, the temporary view will have same behaviors with the permanent view, i.e. capture and store runtime SQL configs, SQL text, catalog and namespace. The capatured view properties will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.storeAnalyzedPlanForView` to `true`.
6264

6365
- Since Spark 3.1, CHAR/CHARACTER and VARCHAR types are supported in the table schema. Table scan/insertion will respect the char/varchar semantic. If char/varchar is used in places other than table schema, an exception will be thrown (CAST is an exception that simply treats char/varchar as string like before). To restore the behavior before Spark 3.1, which treats them as STRING types and ignores a length parameter, e.g. `CHAR(4)`, you can set `spark.sql.legacy.charVarcharAsString` to `true`.
6466

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -610,8 +610,16 @@ class SessionCatalog(
610610
/**
611611
* Return a local temporary view exactly as it was stored.
612612
*/
613+
def getRawTempView(name: String): Option[LogicalPlan] = synchronized {
614+
tempViews.get(formatTableName(name))
615+
}
616+
617+
/**
618+
* Generate a [[View]] operator from the view description if the view stores sql text,
619+
* otherwise, it is same to `getRawTempView`
620+
*/
613621
def getTempView(name: String): Option[LogicalPlan] = synchronized {
614-
tempViews.get(formatTableName(name)).map(getTempViewPlan)
622+
getRawTempView(name).map(getTempViewPlan)
615623
}
616624

617625
def getTempViewNames(): Seq[String] = synchronized {
@@ -621,8 +629,16 @@ class SessionCatalog(
621629
/**
622630
* Return a global temporary view exactly as it was stored.
623631
*/
632+
def getRawGlobalTempView(name: String): Option[LogicalPlan] = {
633+
globalTempViewManager.get(formatTableName(name))
634+
}
635+
636+
/**
637+
* Generate a [[View]] operator from the view description if the view stores sql text,
638+
* otherwise, it is same to `getRawGlobalTempView`
639+
*/
624640
def getGlobalTempView(name: String): Option[LogicalPlan] = {
625-
globalTempViewManager.get(formatTableName(name)).map(getTempViewPlan)
641+
getRawGlobalTempView(name).map(getTempViewPlan)
626642
}
627643

628644
/**
@@ -659,7 +675,7 @@ class SessionCatalog(
659675
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
660676
val table = formatTableName(name.table)
661677
if (name.database.isEmpty) {
662-
getTempView(table).map {
678+
tempViews.get(table).map {
663679
case TemporaryViewRelation(metadata) => metadata
664680
case plan =>
665681
CatalogTable(
@@ -669,7 +685,6 @@ class SessionCatalog(
669685
schema = plan.output.toStructType)
670686
}.getOrElse(getTableMetadata(name))
671687
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
672-
val a = globalTempViewManager.get(table)
673688
globalTempViewManager.get(table).map {
674689
case TemporaryViewRelation(metadata) => metadata
675690
case plan =>
@@ -810,21 +825,34 @@ class SessionCatalog(
810825
// The relation is a view, so we wrap the relation by:
811826
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
812827
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
813-
val child = View.fromCatalogTable(metadata, isTempView = false, parser)
814-
SubqueryAlias(multiParts, child)
828+
SubqueryAlias(multiParts, fromCatalogTable(metadata, isTempView = false))
815829
} else {
816830
SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata, options))
817831
}
818832
}
819833

820-
def getTempViewPlan(plan: LogicalPlan): LogicalPlan = {
834+
private def getTempViewPlan(plan: LogicalPlan): LogicalPlan = {
821835
plan match {
822836
case viewInfo: TemporaryViewRelation =>
823-
View.fromCatalogTable(viewInfo.tableMeta, isTempView = true, parser)
837+
fromCatalogTable(viewInfo.tableMeta, isTempView = true)
824838
case v => v
825839
}
826840
}
827841

842+
private def fromCatalogTable(metadata: CatalogTable, isTempView: Boolean): View = {
843+
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
844+
val viewConfigs = metadata.viewSQLConfigs
845+
val viewPlan =
846+
SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView = isTempView)) {
847+
parser.parsePlan(viewText)
848+
}
849+
View(
850+
desc = metadata,
851+
isTempView = isTempView,
852+
output = metadata.schema.toAttributes,
853+
child = viewPlan)
854+
}
855+
828856
def lookupTempView(table: String): Option[SubqueryAlias] = {
829857
val formattedTable = formatTableName(table)
830858
getTempView(formattedTable).map { view =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.analysis.{EliminateView, MultiInstanceRelat
2222
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
25-
import org.apache.spark.sql.catalyst.parser.ParserInterface
2625
import org.apache.spark.sql.catalyst.plans._
2726
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
2827
import org.apache.spark.sql.catalyst.util.truncatedString
@@ -485,21 +484,6 @@ object View {
485484
}
486485
sqlConf
487486
}
488-
489-
def fromCatalogTable(
490-
metadata: CatalogTable, isTempView: Boolean, parser: ParserInterface): View = {
491-
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
492-
val viewConfigs = metadata.viewSQLConfigs
493-
val viewPlan =
494-
SQLConf.withExistingConf(effectiveSQLConf(viewConfigs, isTempView = isTempView)) {
495-
parser.parsePlan(viewText)
496-
}
497-
View(
498-
desc = metadata,
499-
isTempView = isTempView,
500-
output = metadata.schema.toAttributes,
501-
child = viewPlan)
502-
}
503487
}
504488

505489
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,8 @@ case class CreateViewCommand(
113113
verifyTemporaryObjectsNotExists(catalog, isTemporary, name, child)
114114

115115
if (viewType == LocalTempView) {
116-
val shouldUncache = replace && catalog.getTempView(name.table).exists {
117-
// Uncache View logical plan without checking the same result check, since it's unresolved.
118-
case _: View => true
119-
case other => !other.sameResult(child)
120-
}
121-
if (shouldUncache) {
116+
if (replace && catalog.getRawTempView(name.table).isDefined &&
117+
!catalog.getRawTempView(name.table).get.sameResult(child)) {
122118
logInfo(s"Try to uncache ${name.quotedString} before replacing.")
123119
checkCyclicViewReference(analyzedPlan, Seq(name), name)
124120
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
@@ -141,12 +137,8 @@ case class CreateViewCommand(
141137
} else if (viewType == GlobalTempView) {
142138
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
143139
val viewIdent = TableIdentifier(name.table, Option(db))
144-
val shouldUncache = replace && catalog.getGlobalTempView(name.table).exists {
145-
// Uncache View logical plan without checking the same result check, since it's unresolved.
146-
case _: View => true
147-
case other => !other.sameResult(child)
148-
}
149-
if (shouldUncache) {
140+
if (replace && catalog.getRawGlobalTempView(name.table).isDefined &&
141+
!catalog.getRawGlobalTempView(name.table).get.sameResult(child)) {
150142
logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.")
151143
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
152144
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,4 +1272,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
12721272
}
12731273
}
12741274
}
1275+
1276+
test("SPARK-33647: cache table support for permanent view") {
1277+
withView("v1") {
1278+
spark.catalog.clearCache()
1279+
sql("create or replace view v1 as select 1")
1280+
sql("cache table v1")
1281+
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isDefined)
1282+
sql("create or replace view v1 as select 1, 2")
1283+
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isEmpty)
1284+
sql("cache table v1")
1285+
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined)
1286+
}
1287+
}
12751288
}

sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -812,20 +812,6 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
812812
}
813813
}
814814

815-
test("creating local temp view should not affect existing table reference") {
816-
withTable("t") {
817-
withTempView("t") {
818-
withGlobalTempView("v") {
819-
val globalTempDB = spark.sharedState.globalTempViewManager.database
820-
Seq(2).toDF("c1").write.format("parquet").saveAsTable("t")
821-
sql("CREATE GLOBAL TEMPORARY VIEW v AS SELECT * FROM t")
822-
sql("CREATE TEMPORARY VIEW t AS SELECT 1")
823-
checkAnswer(sql(s"SELECT * FROM ${globalTempDB}.v"), Seq(Row(2)))
824-
}
825-
}
826-
}
827-
}
828-
829815
test("SPARK-33141: view should be parsed and analyzed with configs set when creating") {
830816
withTable("t") {
831817
withView("v1", "v2", "v3", "v4", "v5") {

sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,29 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
200200
}
201201
}
202202

203+
test("view should use captured catalog and namespace to resolve relation") {
204+
withTempDatabase { dbName =>
205+
withTable("default.t", s"$dbName.t") {
206+
withTempView("t") {
207+
// create a table in default database
208+
sql("USE DEFAULT")
209+
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
210+
// create a view refer the created table in default database
211+
val viewName = createView("v1", "SELECT * FROM t")
212+
// using another database to create a table with same name
213+
sql(s"USE $dbName")
214+
Seq(4, 5, 6).toDF("c1").write.format("parquet").saveAsTable("t")
215+
// create a temporary view with the same name
216+
sql("CREATE TEMPORARY VIEW t AS SELECT 1")
217+
withView(viewName) {
218+
// view v1 should still refer the table defined in `default` database
219+
checkViewOutput(viewName, Seq(Row(2), Row(3), Row(1)))
220+
}
221+
}
222+
}
223+
}
224+
}
225+
203226
test("SPARK-33692: view should use captured catalog and namespace to lookup function") {
204227
val avgFuncClass = "test.org.apache.spark.sql.MyDoubleAvg"
205228
val sumFuncClass = "test.org.apache.spark.sql.MyDoubleSum"
@@ -231,7 +254,6 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
231254
class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
232255
override protected def viewTypeString: String = "TEMPORARY VIEW"
233256
override protected def formattedViewName(viewName: String): String = viewName
234-
235257
}
236258

237259
class GlobalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {

0 commit comments

Comments
 (0)