Skip to content

Commit a42b8b9

Browse files
committed
address comments
1 parent 6f785e3 commit a42b8b9

File tree

1 file changed

+90
-51
lines changed

1 file changed

+90
-51
lines changed

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala

Lines changed: 90 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,62 +19,72 @@ package org.apache.spark.sql.hive
1919

2020
import java.net.URI
2121

22-
import org.apache.hadoop.fs.Path
22+
import org.scalatest.BeforeAndAfterEach
2323

2424
import org.apache.spark.sql.QueryTest
2525
import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
2727
import org.apache.spark.sql.hive.client.HiveClient
2828
import org.apache.spark.sql.hive.test.TestHiveSingleton
29+
import org.apache.spark.sql.test.SQLTestUtils
2930
import org.apache.spark.sql.types.StructType
3031
import org.apache.spark.util.Utils
3132

3233

33-
class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestHiveSingleton {
34+
class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
35+
with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
3436

3537
// To test `HiveExternalCatalog`, we need to read/write the raw table meta from/to hive client.
3638
val hiveClient: HiveClient =
3739
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
3840

3941
val tempDir = Utils.createTempDir().getCanonicalFile
4042

41-
override def beforeAll(): Unit = {
42-
for ((tbl, _, _) <- rawTablesAndExpectations) {
43+
override def beforeEach(): Unit = {
44+
sql("CREATE DATABASE test_db")
45+
for ((tbl, _) <- rawTablesAndExpectations) {
4346
hiveClient.createTable(tbl, ignoreIfExists = false)
4447
}
4548
}
4649

47-
override def afterAll(): Unit = {
50+
override def afterEach(): Unit = {
4851
Utils.deleteRecursively(tempDir)
49-
for (i <- 1 to rawTablesAndExpectations.length) {
50-
hiveClient.dropTable("default", s"tbl$i", ignoreIfNotExists = true, purge = false)
51-
}
52+
hiveClient.dropDatabase("test_db", ignoreIfNotExists = false, cascade = true)
53+
}
54+
55+
private def getTableMetadata(tableName: String): CatalogTable = {
56+
spark.sharedState.externalCatalog.getTable("test_db", tableName)
5257
}
5358

59+
private def defaultTablePath(tableName: String): String = {
60+
spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, Some("test_db")))
61+
}
5462

55-
// Raw table metadata that are dumped from tables created by Spark 2.0
63+
64+
// Raw table metadata that are dumped from tables created by Spark 2.0. Note that, all spark
65+
// versions prior to 2.1 would generate same raw table metadata for a specific table.
5666
val simpleSchema = new StructType().add("i", "int")
5767
val partitionedSchema = new StructType().add("i", "int").add("j", "int")
5868

59-
val hiveTable = CatalogTable(
60-
identifier = TableIdentifier("tbl1", Some("default")),
69+
lazy val hiveTable = CatalogTable(
70+
identifier = TableIdentifier("tbl1", Some("test_db")),
6171
tableType = CatalogTableType.MANAGED,
6272
storage = CatalogStorageFormat.empty.copy(
6373
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
6474
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
6575
schema = simpleSchema)
6676

67-
val externalHiveTable = CatalogTable(
68-
identifier = TableIdentifier("tbl2", Some("default")),
77+
lazy val externalHiveTable = CatalogTable(
78+
identifier = TableIdentifier("tbl2", Some("test_db")),
6979
tableType = CatalogTableType.EXTERNAL,
7080
storage = CatalogStorageFormat.empty.copy(
71-
locationUri = Some(tempDir.getCanonicalPath),
81+
locationUri = Some(tempDir.getAbsolutePath),
7282
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
7383
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
7484
schema = simpleSchema)
7585

76-
val partitionedHiveTable = CatalogTable(
77-
identifier = TableIdentifier("tbl3", Some("default")),
86+
lazy val partitionedHiveTable = CatalogTable(
87+
identifier = TableIdentifier("tbl3", Some("test_db")),
7888
tableType = CatalogTableType.MANAGED,
7989
storage = CatalogStorageFormat.empty.copy(
8090
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
@@ -115,12 +125,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestH
115125
|}
116126
""".stripMargin
117127

118-
def defaultTablePath(tableName: String): String = {
119-
spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName))
120-
}
121-
122-
val dataSourceTable = CatalogTable(
123-
identifier = TableIdentifier("tbl4", Some("default")),
128+
lazy val dataSourceTable = CatalogTable(
129+
identifier = TableIdentifier("tbl4", Some("test_db")),
124130
tableType = CatalogTableType.MANAGED,
125131
storage = CatalogStorageFormat.empty.copy(properties = Map("path" -> defaultTablePath("tbl4"))),
126132
schema = new StructType(),
@@ -129,8 +135,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestH
129135
"spark.sql.sources.schema.numParts" -> "1",
130136
"spark.sql.sources.schema.part.0" -> simpleSchemaJson))
131137

132-
val hiveCompatibleDataSourceTable = CatalogTable(
133-
identifier = TableIdentifier("tbl5", Some("default")),
138+
lazy val hiveCompatibleDataSourceTable = CatalogTable(
139+
identifier = TableIdentifier("tbl5", Some("test_db")),
134140
tableType = CatalogTableType.MANAGED,
135141
storage = CatalogStorageFormat.empty.copy(properties = Map("path" -> defaultTablePath("tbl5"))),
136142
schema = simpleSchema,
@@ -139,8 +145,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestH
139145
"spark.sql.sources.schema.numParts" -> "1",
140146
"spark.sql.sources.schema.part.0" -> simpleSchemaJson))
141147

142-
val partitionedDataSourceTable = CatalogTable(
143-
identifier = TableIdentifier("tbl6", Some("default")),
148+
lazy val partitionedDataSourceTable = CatalogTable(
149+
identifier = TableIdentifier("tbl6", Some("test_db")),
144150
tableType = CatalogTableType.MANAGED,
145151
storage = CatalogStorageFormat.empty.copy(properties = Map("path" -> defaultTablePath("tbl6"))),
146152
schema = new StructType(),
@@ -151,20 +157,20 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestH
151157
"spark.sql.sources.schema.numPartCols" -> "1",
152158
"spark.sql.sources.schema.partCol.0" -> "j"))
153159

154-
val externalDataSourceTable = CatalogTable(
155-
identifier = TableIdentifier("tbl7", Some("default")),
160+
lazy val externalDataSourceTable = CatalogTable(
161+
identifier = TableIdentifier("tbl7", Some("test_db")),
156162
tableType = CatalogTableType.EXTERNAL,
157163
storage = CatalogStorageFormat.empty.copy(
158-
locationUri = Some(new Path(defaultTablePath("tbl7"), "-__PLACEHOLDER__").toString),
164+
locationUri = Some(defaultTablePath("tbl7") + "-__PLACEHOLDER__"),
159165
properties = Map("path" -> tempDir.getAbsolutePath)),
160166
schema = new StructType(),
161167
properties = Map(
162168
"spark.sql.sources.provider" -> "json",
163169
"spark.sql.sources.schema.numParts" -> "1",
164170
"spark.sql.sources.schema.part.0" -> simpleSchemaJson))
165171

166-
val hiveCompatibleExternalDataSourceTable = CatalogTable(
167-
identifier = TableIdentifier("tbl8", Some("default")),
172+
lazy val hiveCompatibleExternalDataSourceTable = CatalogTable(
173+
identifier = TableIdentifier("tbl8", Some("test_db")),
168174
tableType = CatalogTableType.EXTERNAL,
169175
storage = CatalogStorageFormat.empty.copy(
170176
locationUri = Some(tempDir.getAbsolutePath),
@@ -175,38 +181,71 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestH
175181
"spark.sql.sources.schema.numParts" -> "1",
176182
"spark.sql.sources.schema.part.0" -> simpleSchemaJson))
177183

178-
val dataSourceTableWithoutSchema = CatalogTable(
179-
identifier = TableIdentifier("tbl9", Some("default")),
184+
lazy val dataSourceTableWithoutSchema = CatalogTable(
185+
identifier = TableIdentifier("tbl9", Some("test_db")),
180186
tableType = CatalogTableType.EXTERNAL,
181187
storage = CatalogStorageFormat.empty.copy(
182-
locationUri = Some(new Path(defaultTablePath("tbl9"), "-__PLACEHOLDER__").toString),
188+
locationUri = Some(defaultTablePath("tbl9") + "-__PLACEHOLDER__"),
183189
properties = Map("path" -> tempDir.getAbsolutePath)),
184190
schema = new StructType(),
185191
properties = Map("spark.sql.sources.provider" -> "json"))
186192

187-
// A list of all raw tables we want to test, with their expected schema and table location.
188-
val rawTablesAndExpectations = Seq(
189-
(hiveTable, simpleSchema, None),
190-
(externalHiveTable, simpleSchema, Some(tempDir.getCanonicalPath)),
191-
(partitionedHiveTable, partitionedSchema, None),
192-
(dataSourceTable, simpleSchema, None),
193-
(hiveCompatibleDataSourceTable, simpleSchema, None),
194-
(partitionedDataSourceTable, partitionedSchema, None),
195-
(externalDataSourceTable, simpleSchema, Some(tempDir.getCanonicalPath)),
196-
(hiveCompatibleExternalDataSourceTable, simpleSchema, Some(tempDir.getCanonicalPath)),
197-
(dataSourceTableWithoutSchema, new StructType(), None))
193+
// A list of all raw tables we want to test, with their expected schema.
194+
lazy val rawTablesAndExpectations = Seq(
195+
hiveTable -> simpleSchema,
196+
externalHiveTable -> simpleSchema,
197+
partitionedHiveTable -> partitionedSchema,
198+
dataSourceTable -> simpleSchema,
199+
hiveCompatibleDataSourceTable -> simpleSchema,
200+
partitionedDataSourceTable -> partitionedSchema,
201+
externalDataSourceTable -> simpleSchema,
202+
hiveCompatibleExternalDataSourceTable -> simpleSchema,
203+
dataSourceTableWithoutSchema -> new StructType())
198204

199205
test("make sure we can read table created by old version of Spark") {
200-
for ((tbl, expectedSchema, expectedLocation) <- rawTablesAndExpectations) {
201-
val readBack = spark.sharedState.externalCatalog.getTable(
202-
tbl.identifier.database.get, tbl.identifier.table)
203-
206+
for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
207+
val readBack = getTableMetadata(tbl.identifier.table)
204208
assert(readBack.schema == expectedSchema)
205-
expectedLocation.foreach { loc =>
209+
210+
if (tbl.tableType == CatalogTableType.EXTERNAL) {
206211
// trim the URI prefix
207212
val tableLocation = new URI(readBack.storage.locationUri.get).getPath
208-
assert(tableLocation == loc)
213+
assert(tableLocation == tempDir.getAbsolutePath)
214+
}
215+
}
216+
}
217+
218+
test("make sure we can alter table location created by old version of Spark") {
219+
withTempDir { dir =>
220+
for ((tbl, _) <- rawTablesAndExpectations if tbl.tableType == CatalogTableType.EXTERNAL) {
221+
sql(s"ALTER TABLE ${tbl.identifier} SET LOCATION '${dir.getAbsolutePath}'")
222+
223+
val readBack = getTableMetadata(tbl.identifier.table)
224+
225+
// trim the URI prefix
226+
val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath
227+
assert(actualTableLocation == dir.getAbsolutePath)
228+
}
229+
}
230+
}
231+
232+
test("make sure we can rename table created by old version of Spark") {
233+
for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
234+
val newName = tbl.identifier.table + "_renamed"
235+
sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName")
236+
237+
val readBack = getTableMetadata(newName)
238+
assert(readBack.schema == expectedSchema)
239+
240+
// trim the URI prefix
241+
val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath
242+
val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) {
243+
tempDir.getAbsolutePath
244+
} else {
245+
// trim the URI prefix
246+
new URI(defaultTablePath(newName)).getPath
209247
}
248+
assert(actualTableLocation == expectedLocation)
210249
}
211250
}
212251
}

0 commit comments

Comments
 (0)