Skip to content

Commit 3e7beb8

Browse files
committed
set expectedOutputAttributes
1 parent 7e418e9 commit 3e7beb8

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
197197
* source information.
198198
*/
199199
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
200-
private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
200+
private def readDataSourceTable(
201+
sparkSession: SparkSession,
202+
simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
203+
val table = simpleCatalogRelation.catalogTable
201204
val dataSource =
202205
DataSource(
203206
sparkSession,
@@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
209212

210213
LogicalRelation(
211214
dataSource.resolveRelation(),
215+
expectedOutputAttributes = Some(simpleCatalogRelation.output),
212216
catalogTable = Some(table))
213217
}
214218

215219
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
216220
case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
217221
if DDLUtils.isDatasourceTable(s.metadata) =>
218-
i.copy(table = readDataSourceTable(sparkSession, s.metadata))
222+
i.copy(table = readDataSourceTable(sparkSession, s))
219223

220224
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
221-
readDataSourceTable(sparkSession, s.metadata)
225+
readDataSourceTable(sparkSession, s)
222226
}
223227
}
224228

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ import java.math.MathContext
2222
import java.sql.{Date, Timestamp}
2323

2424
import org.apache.spark.{AccumulatorSuite, SparkException}
25-
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
26-
import org.apache.spark.sql.catalyst.expressions.SortOrder
27-
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
25+
import org.apache.spark.sql.catalyst.plans.logical.Project
2826
import org.apache.spark.sql.catalyst.util.StringUtils
2927
import org.apache.spark.sql.execution.aggregate
3028
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
@@ -2310,6 +2308,18 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
23102308
}
23112309
}
23122310

2311+
test("data source table created in InMemoryCatalog should guarantee resolving consistency") {
2312+
val table = "tbl"
2313+
withTable("tbl") {
2314+
sql("CREATE TABLE tbl(i INT, j STRING) USING parquet")
2315+
val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(table)
2316+
val relation = spark.sessionState.catalog.lookupRelation(tableIdent)
2317+
val expr = relation.resolve("i")
2318+
val plan = Dataset.ofRows(spark, Project(Seq(expr), relation))
2319+
plan.queryExecution.assertAnalyzed()
2320+
}
2321+
}
2322+
23132323
test("Eliminate noop ordinal ORDER BY") {
23142324
withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "true") {
23152325
val plan1 = sql("SELECT 1.0, 'abc', year(current_date()) ORDER BY 1, 2, 3")

0 commit comments

Comments
 (0)