Skip to content

Commit b9542ef

Browse files
committed
Refresh metadata cache in the Catalog in CreateMetastoreDataSourceAsSelect.
1 parent e0e64ba commit b9542ef

File tree

2 files changed

+54
-0
lines changed

2 files changed

+54
-0
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@ case class CreateMetastoreDataSourceAsSelect(
248248
isExternal)
249249
}
250250

251+
// Refresh the cache of the table in the catalog.
252+
hiveContext.refreshTable(tableName)
251253
Seq.empty[Row]
252254
}
253255
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,4 +612,56 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
612612
val actualSchema = table("wide_schema").schema
613613
assert(schema === actualSchema)
614614
}
615+
616+
test("insert into a table") {
617+
def createDF(from: Int, to: Int): DataFrame =
618+
createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2")
619+
620+
createDF(0, 9).saveAsTable("insertParquet", "parquet")
621+
checkAnswer(
622+
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
623+
(6 to 9).map(i => Row(i, s"str$i")))
624+
625+
intercept[AnalysisException] {
626+
createDF(10, 19).saveAsTable("insertParquet", "parquet")
627+
}
628+
629+
createDF(10, 19).saveAsTable("insertParquet", "parquet", SaveMode.Append)
630+
checkAnswer(
631+
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
632+
(6 to 19).map(i => Row(i, s"str$i")))
633+
634+
createDF(20, 29).saveAsTable("insertParquet", "parquet", SaveMode.Append)
635+
checkAnswer(
636+
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
637+
(6 to 24).map(i => Row(i, s"str$i")))
638+
639+
intercept[AnalysisException] {
640+
createDF(30, 39).saveAsTable("insertParquet")
641+
}
642+
643+
createDF(30, 39).saveAsTable("insertParquet", SaveMode.Append)
644+
checkAnswer(
645+
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
646+
(6 to 34).map(i => Row(i, s"str$i")))
647+
648+
createDF(40, 49).insertInto("insertParquet")
649+
checkAnswer(
650+
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
651+
(6 to 44).map(i => Row(i, s"str$i")))
652+
653+
createDF(50, 59).saveAsTable("insertParquet", SaveMode.Overwrite)
654+
checkAnswer(
655+
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"),
656+
(52 to 54).map(i => Row(i, s"str$i")))
657+
createDF(60, 69).saveAsTable("insertParquet", SaveMode.Ignore)
658+
checkAnswer(
659+
sql("SELECT p.c1, c2 FROM insertParquet p"),
660+
(50 to 59).map(i => Row(i, s"str$i")))
661+
662+
createDF(70, 79).insertInto("insertParquet", overwrite = true)
663+
checkAnswer(
664+
sql("SELECT p.c1, c2 FROM insertParquet p"),
665+
(70 to 79).map(i => Row(i, s"str$i")))
666+
}
615667
}

0 commit comments

Comments
 (0)