Skip to content

Commit f1c86b8

Browse files
committed
initial commit
1 parent 6813754 commit f1c86b8

File tree

8 files changed

+97
-92
lines changed

8 files changed

+97
-92
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import javax.annotation.concurrent.GuardedBy
2222
import scala.collection.mutable
2323

2424
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
25-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2625
import org.apache.spark.sql.catalyst.util.StringUtils
2726
import org.apache.spark.sql.errors.QueryCompilationErrors
2827

@@ -40,12 +39,12 @@ class GlobalTempViewManager(val database: String) {
4039

4140
/** List of view definitions, mapping from view name to logical plan. */
4241
@GuardedBy("this")
43-
private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]
42+
private val viewDefinitions = new mutable.HashMap[String, TemporaryViewRelation]
4443

4544
/**
4645
* Returns the global view definition which matches the given name, or None if not found.
4746
*/
48-
def get(name: String): Option[LogicalPlan] = synchronized {
47+
def get(name: String): Option[TemporaryViewRelation] = synchronized {
4948
viewDefinitions.get(name)
5049
}
5150

@@ -55,7 +54,7 @@ class GlobalTempViewManager(val database: String) {
5554
*/
5655
def create(
5756
name: String,
58-
viewDefinition: LogicalPlan,
57+
viewDefinition: TemporaryViewRelation,
5958
overrideIfExists: Boolean): Unit = synchronized {
6059
if (!overrideIfExists && viewDefinitions.contains(name)) {
6160
throw new TempTableAlreadyExistsException(name)
@@ -68,7 +67,7 @@ class GlobalTempViewManager(val database: String) {
6867
*/
6968
def update(
7069
name: String,
71-
viewDefinition: LogicalPlan): Boolean = synchronized {
70+
viewDefinition: TemporaryViewRelation): Boolean = synchronized {
7271
if (viewDefinitions.contains(name)) {
7372
viewDefinitions.put(name, viewDefinition)
7473
true

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 17 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class SessionCatalog(
101101

102102
/** List of temporary views, mapping from table name to their logical plan. */
103103
@GuardedBy("this")
104-
protected val tempViews = new mutable.HashMap[String, LogicalPlan]
104+
protected val tempViews = new mutable.HashMap[String, TemporaryViewRelation]
105105

106106
// Note: we track current database here because certain operations do not explicitly
107107
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
@@ -573,21 +573,21 @@ class SessionCatalog(
573573
*/
574574
def createTempView(
575575
name: String,
576-
tableDefinition: LogicalPlan,
576+
viewDefinition: TemporaryViewRelation,
577577
overrideIfExists: Boolean): Unit = synchronized {
578578
val table = formatTableName(name)
579579
if (tempViews.contains(table) && !overrideIfExists) {
580580
throw new TempTableAlreadyExistsException(name)
581581
}
582-
tempViews.put(table, tableDefinition)
582+
tempViews.put(table, viewDefinition)
583583
}
584584

585585
/**
586586
* Create a global temporary view.
587587
*/
588588
def createGlobalTempView(
589589
name: String,
590-
viewDefinition: LogicalPlan,
590+
viewDefinition: TemporaryViewRelation,
591591
overrideIfExists: Boolean): Unit = {
592592
globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
593593
}
@@ -598,7 +598,7 @@ class SessionCatalog(
598598
*/
599599
def alterTempViewDefinition(
600600
name: TableIdentifier,
601-
viewDefinition: LogicalPlan): Boolean = synchronized {
601+
viewDefinition: TemporaryViewRelation): Boolean = synchronized {
602602
val viewName = formatTableName(name.table)
603603
if (name.database.isEmpty) {
604604
if (tempViews.contains(viewName)) {
@@ -617,14 +617,14 @@ class SessionCatalog(
617617
/**
618618
* Return a local temporary view exactly as it was stored.
619619
*/
620-
def getRawTempView(name: String): Option[LogicalPlan] = synchronized {
620+
def getRawTempView(name: String): Option[TemporaryViewRelation] = synchronized {
621621
tempViews.get(formatTableName(name))
622622
}
623623

624624
/**
625625
* Generate a [[View]] operator from the temporary view stored.
626626
*/
627-
def getTempView(name: String): Option[LogicalPlan] = synchronized {
627+
def getTempView(name: String): Option[View] = synchronized {
628628
getRawTempView(name).map(getTempViewPlan)
629629
}
630630

@@ -635,14 +635,14 @@ class SessionCatalog(
635635
/**
636636
* Return a global temporary view exactly as it was stored.
637637
*/
638-
def getRawGlobalTempView(name: String): Option[LogicalPlan] = {
638+
def getRawGlobalTempView(name: String): Option[TemporaryViewRelation] = {
639639
globalTempViewManager.get(formatTableName(name))
640640
}
641641

642642
/**
643643
* Generate a [[View]] operator from the global temporary view stored.
644644
*/
645-
def getGlobalTempView(name: String): Option[LogicalPlan] = {
645+
def getGlobalTempView(name: String): Option[View] = {
646646
getRawGlobalTempView(name).map(getTempViewPlan)
647647
}
648648

@@ -680,25 +680,10 @@ class SessionCatalog(
680680
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
681681
val table = formatTableName(name.table)
682682
if (name.database.isEmpty) {
683-
tempViews.get(table).map {
684-
case TemporaryViewRelation(metadata, _) => metadata
685-
case plan =>
686-
CatalogTable(
687-
identifier = TableIdentifier(table),
688-
tableType = CatalogTableType.VIEW,
689-
storage = CatalogStorageFormat.empty,
690-
schema = plan.output.toStructType)
691-
}.getOrElse(getTableMetadata(name))
683+
tempViews.get(table).map(_.tableMeta).getOrElse(getTableMetadata(name))
692684
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
693-
globalTempViewManager.get(table).map {
694-
case TemporaryViewRelation(metadata, _) => metadata
695-
case plan =>
696-
CatalogTable(
697-
identifier = TableIdentifier(table, Some(globalTempViewManager.database)),
698-
tableType = CatalogTableType.VIEW,
699-
storage = CatalogStorageFormat.empty,
700-
schema = plan.output.toStructType)
701-
}.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
685+
globalTempViewManager.get(table).map(_.tableMeta)
686+
.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
702687
} else {
703688
getTableMetadata(name)
704689
}
@@ -834,20 +819,11 @@ class SessionCatalog(
834819
}
835820
}
836821

837-
private def getTempViewPlan(plan: LogicalPlan): LogicalPlan = {
838-
plan match {
839-
case TemporaryViewRelation(tableMeta, None) =>
840-
fromCatalogTable(tableMeta, isTempView = true)
841-
case TemporaryViewRelation(tableMeta, Some(plan)) =>
842-
View(desc = tableMeta, isTempView = true, child = plan)
843-
case other => other
844-
}
845-
}
846-
847-
def getTempViewSchema(plan: LogicalPlan): StructType = {
848-
plan match {
849-
case viewInfo: TemporaryViewRelation => viewInfo.tableMeta.schema
850-
case v => v.schema
822+
private def getTempViewPlan(viewInfo: TemporaryViewRelation): View = {
823+
if (viewInfo.plan.isEmpty) {
824+
fromCatalogTable(viewInfo.tableMeta, isTempView = true)
825+
} else {
826+
View(desc = viewInfo.tableMeta, isTempView = true, child = viewInfo.plan.get)
851827
}
852828
}
853829

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,62 @@ import java.net.URI
2121
import java.util.Locale
2222

2323
import org.apache.spark.sql.AnalysisException
24-
import org.apache.spark.sql.catalyst.QueryPlanningTracker
25-
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
24+
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, TableIdentifier}
25+
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog, TemporaryViewRelation}
26+
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
2627
import org.apache.spark.sql.catalyst.parser.ParseException
2728
import org.apache.spark.sql.catalyst.plans.PlanTest
2829
import org.apache.spark.sql.catalyst.plans.logical._
2930
import org.apache.spark.sql.catalyst.rules.Rule
30-
import org.apache.spark.sql.internal.SQLConf
31+
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
3132

3233
trait AnalysisTest extends PlanTest {
3334

3435
protected def extendedAnalysisRules: Seq[Rule[LogicalPlan]] = Nil
3536

37+
protected def createTempView(
38+
catalog: SessionCatalog,
39+
name: String,
40+
plan: LogicalPlan,
41+
overrideIfExists: Boolean): Unit = {
42+
val identifier = TableIdentifier(name)
43+
val metadata = CatalogTable(
44+
identifier = identifier,
45+
tableType = CatalogTableType.VIEW,
46+
storage = CatalogStorageFormat.empty,
47+
schema = plan.schema,
48+
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")))
49+
val viewDefinition = TemporaryViewRelation(metadata, Some(plan))
50+
catalog.createTempView(name, viewDefinition, overrideIfExists)
51+
}
52+
53+
protected def createGlobalTempView(
54+
catalog: SessionCatalog,
55+
name: String,
56+
plan: LogicalPlan,
57+
overrideIfExists: Boolean): Unit = {
58+
val globalDb = Some(SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE))
59+
val identifier = TableIdentifier(name, globalDb)
60+
val metadata = CatalogTable(
61+
identifier = identifier,
62+
tableType = CatalogTableType.VIEW,
63+
storage = CatalogStorageFormat.empty,
64+
schema = plan.schema,
65+
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")))
66+
val viewDefinition = TemporaryViewRelation(metadata, Some(plan))
67+
catalog.createGlobalTempView(name, viewDefinition, overrideIfExists)
68+
}
69+
3670
protected def getAnalyzer: Analyzer = {
3771
val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin)
3872
catalog.createDatabase(
3973
CatalogDatabase("default", "", new URI("loc"), Map.empty),
4074
ignoreIfExists = false)
41-
catalog.createTempView("TaBlE", TestRelations.testRelation, overrideIfExists = true)
42-
catalog.createTempView("TaBlE2", TestRelations.testRelation2, overrideIfExists = true)
43-
catalog.createTempView("TaBlE3", TestRelations.testRelation3, overrideIfExists = true)
44-
catalog.createGlobalTempView("TaBlE4", TestRelations.testRelation4, overrideIfExists = true)
45-
catalog.createGlobalTempView("TaBlE5", TestRelations.testRelation5, overrideIfExists = true)
75+
createTempView(catalog, "TaBlE", TestRelations.testRelation, overrideIfExists = true)
76+
createTempView(catalog, "TaBlE2", TestRelations.testRelation2, overrideIfExists = true)
77+
createTempView(catalog, "TaBlE3", TestRelations.testRelation3, overrideIfExists = true)
78+
createGlobalTempView(catalog, "TaBlE4", TestRelations.testRelation4, overrideIfExists = true)
79+
createGlobalTempView(catalog, "TaBlE5", TestRelations.testRelation5, overrideIfExists = true)
4680
new Analyzer(catalog) {
4781
override val extendedResolutionRules = EliminateSubqueryAliases +: extendedAnalysisRules
4882
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project, Unio
2828
import org.apache.spark.sql.internal.SQLConf
2929
import org.apache.spark.sql.types._
3030

31-
3231
class DecimalPrecisionSuite extends AnalysisTest with BeforeAndAfter {
3332
private val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry)
3433
private val analyzer = new Analyzer(catalog)
@@ -49,10 +48,6 @@ class DecimalPrecisionSuite extends AnalysisTest with BeforeAndAfter {
4948
private val f: Expression = UnresolvedAttribute("f")
5049
private val b: Expression = UnresolvedAttribute("b")
5150

52-
before {
53-
catalog.createTempView("table", relation, overrideIfExists = true)
54-
}
55-
5651
private def checkType(expression: Expression, expectedType: DataType): Unit = {
5752
val plan = Project(Seq(Alias(expression, "c")()), relation)
5853
assert(analyzer.execute(plan).schema.fields(0).dataType === expectedType)

0 commit comments

Comments
 (0)