@@ -19,62 +19,72 @@ package org.apache.spark.sql.hive
1919
2020import java .net .URI
2121
22- import org .apache . hadoop . fs . Path
22+ import org .scalatest . BeforeAndAfterEach
2323
2424import org .apache .spark .sql .QueryTest
2525import org .apache .spark .sql .catalyst .TableIdentifier
2626import org .apache .spark .sql .catalyst .catalog .{CatalogStorageFormat , CatalogTable , CatalogTableType }
2727import org .apache .spark .sql .hive .client .HiveClient
2828import org .apache .spark .sql .hive .test .TestHiveSingleton
29+ import org .apache .spark .sql .test .SQLTestUtils
2930import org .apache .spark .sql .types .StructType
3031import org .apache .spark .util .Utils
3132
3233
33- class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestHiveSingleton {
34+ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
35+ with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
3436
3537 // To test `HiveExternalCatalog`, we need to read/write the raw table meta from/to hive client.
3638 val hiveClient : HiveClient =
3739 spark.sharedState.externalCatalog.asInstanceOf [HiveExternalCatalog ].client
3840
3941 val tempDir = Utils .createTempDir().getCanonicalFile
4042
41- override def beforeAll (): Unit = {
42- for ((tbl, _, _) <- rawTablesAndExpectations) {
43+ override def beforeEach (): Unit = {
44+ sql(" CREATE DATABASE test_db" )
45+ for ((tbl, _) <- rawTablesAndExpectations) {
4346 hiveClient.createTable(tbl, ignoreIfExists = false )
4447 }
4548 }
4649
47- override def afterAll (): Unit = {
50+ override def afterEach (): Unit = {
4851 Utils .deleteRecursively(tempDir)
49- for (i <- 1 to rawTablesAndExpectations.length) {
50- hiveClient.dropTable(" default" , s " tbl $i" , ignoreIfNotExists = true , purge = false )
51- }
52+ hiveClient.dropDatabase(" test_db" , ignoreIfNotExists = false , cascade = true )
53+ }
54+
55+ private def getTableMetadata (tableName : String ): CatalogTable = {
56+ spark.sharedState.externalCatalog.getTable(" test_db" , tableName)
5257 }
5358
59+ private def defaultTablePath (tableName : String ): String = {
60+ spark.sessionState.catalog.defaultTablePath(TableIdentifier (tableName, Some (" test_db" )))
61+ }
5462
55- // Raw table metadata that are dumped from tables created by Spark 2.0
63+
64+ // Raw table metadata that are dumped from tables created by Spark 2.0. Note that, all spark
65+ // versions prior to 2.1 would generate same raw table metadata for a specific table.
5666 val simpleSchema = new StructType ().add(" i" , " int" )
5767 val partitionedSchema = new StructType ().add(" i" , " int" ).add(" j" , " int" )
5868
59- val hiveTable = CatalogTable (
60- identifier = TableIdentifier (" tbl1" , Some (" default " )),
69+ lazy val hiveTable = CatalogTable (
70+ identifier = TableIdentifier (" tbl1" , Some (" test_db " )),
6171 tableType = CatalogTableType .MANAGED ,
6272 storage = CatalogStorageFormat .empty.copy(
6373 inputFormat = Some (" org.apache.hadoop.mapred.TextInputFormat" ),
6474 outputFormat = Some (" org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" )),
6575 schema = simpleSchema)
6676
67- val externalHiveTable = CatalogTable (
68- identifier = TableIdentifier (" tbl2" , Some (" default " )),
77+ lazy val externalHiveTable = CatalogTable (
78+ identifier = TableIdentifier (" tbl2" , Some (" test_db " )),
6979 tableType = CatalogTableType .EXTERNAL ,
7080 storage = CatalogStorageFormat .empty.copy(
71- locationUri = Some (tempDir.getCanonicalPath ),
81+ locationUri = Some (tempDir.getAbsolutePath ),
7282 inputFormat = Some (" org.apache.hadoop.mapred.TextInputFormat" ),
7383 outputFormat = Some (" org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" )),
7484 schema = simpleSchema)
7585
76- val partitionedHiveTable = CatalogTable (
77- identifier = TableIdentifier (" tbl3" , Some (" default " )),
86+ lazy val partitionedHiveTable = CatalogTable (
87+ identifier = TableIdentifier (" tbl3" , Some (" test_db " )),
7888 tableType = CatalogTableType .MANAGED ,
7989 storage = CatalogStorageFormat .empty.copy(
8090 inputFormat = Some (" org.apache.hadoop.mapred.TextInputFormat" ),
@@ -115,12 +125,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestH
115125 |}
116126 """ .stripMargin
117127
118- def defaultTablePath (tableName : String ): String = {
119- spark.sessionState.catalog.defaultTablePath(TableIdentifier (tableName))
120- }
121-
122- val dataSourceTable = CatalogTable (
123- identifier = TableIdentifier (" tbl4" , Some (" default" )),
128+ lazy val dataSourceTable = CatalogTable (
129+ identifier = TableIdentifier (" tbl4" , Some (" test_db" )),
124130 tableType = CatalogTableType .MANAGED ,
125131 storage = CatalogStorageFormat .empty.copy(properties = Map (" path" -> defaultTablePath(" tbl4" ))),
126132 schema = new StructType (),
@@ -129,8 +135,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestH
129135 " spark.sql.sources.schema.numParts" -> " 1" ,
130136 " spark.sql.sources.schema.part.0" -> simpleSchemaJson))
131137
132- val hiveCompatibleDataSourceTable = CatalogTable (
133- identifier = TableIdentifier (" tbl5" , Some (" default " )),
138+ lazy val hiveCompatibleDataSourceTable = CatalogTable (
139+ identifier = TableIdentifier (" tbl5" , Some (" test_db " )),
134140 tableType = CatalogTableType .MANAGED ,
135141 storage = CatalogStorageFormat .empty.copy(properties = Map (" path" -> defaultTablePath(" tbl5" ))),
136142 schema = simpleSchema,
@@ -139,8 +145,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestH
139145 " spark.sql.sources.schema.numParts" -> " 1" ,
140146 " spark.sql.sources.schema.part.0" -> simpleSchemaJson))
141147
142- val partitionedDataSourceTable = CatalogTable (
143- identifier = TableIdentifier (" tbl6" , Some (" default " )),
148+ lazy val partitionedDataSourceTable = CatalogTable (
149+ identifier = TableIdentifier (" tbl6" , Some (" test_db " )),
144150 tableType = CatalogTableType .MANAGED ,
145151 storage = CatalogStorageFormat .empty.copy(properties = Map (" path" -> defaultTablePath(" tbl6" ))),
146152 schema = new StructType (),
@@ -151,20 +157,20 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestH
151157 " spark.sql.sources.schema.numPartCols" -> " 1" ,
152158 " spark.sql.sources.schema.partCol.0" -> " j" ))
153159
154- val externalDataSourceTable = CatalogTable (
155- identifier = TableIdentifier (" tbl7" , Some (" default " )),
160+ lazy val externalDataSourceTable = CatalogTable (
161+ identifier = TableIdentifier (" tbl7" , Some (" test_db " )),
156162 tableType = CatalogTableType .EXTERNAL ,
157163 storage = CatalogStorageFormat .empty.copy(
158- locationUri = Some (new Path ( defaultTablePath(" tbl7" ), " -__PLACEHOLDER__" ).toString ),
164+ locationUri = Some (defaultTablePath(" tbl7" ) + " -__PLACEHOLDER__" ),
159165 properties = Map (" path" -> tempDir.getAbsolutePath)),
160166 schema = new StructType (),
161167 properties = Map (
162168 " spark.sql.sources.provider" -> " json" ,
163169 " spark.sql.sources.schema.numParts" -> " 1" ,
164170 " spark.sql.sources.schema.part.0" -> simpleSchemaJson))
165171
166- val hiveCompatibleExternalDataSourceTable = CatalogTable (
167- identifier = TableIdentifier (" tbl8" , Some (" default " )),
172+ lazy val hiveCompatibleExternalDataSourceTable = CatalogTable (
173+ identifier = TableIdentifier (" tbl8" , Some (" test_db " )),
168174 tableType = CatalogTableType .EXTERNAL ,
169175 storage = CatalogStorageFormat .empty.copy(
170176 locationUri = Some (tempDir.getAbsolutePath),
@@ -175,38 +181,71 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with TestH
175181 " spark.sql.sources.schema.numParts" -> " 1" ,
176182 " spark.sql.sources.schema.part.0" -> simpleSchemaJson))
177183
178- val dataSourceTableWithoutSchema = CatalogTable (
179- identifier = TableIdentifier (" tbl9" , Some (" default " )),
184+ lazy val dataSourceTableWithoutSchema = CatalogTable (
185+ identifier = TableIdentifier (" tbl9" , Some (" test_db " )),
180186 tableType = CatalogTableType .EXTERNAL ,
181187 storage = CatalogStorageFormat .empty.copy(
182- locationUri = Some (new Path ( defaultTablePath(" tbl9" ), " -__PLACEHOLDER__" ).toString ),
188+ locationUri = Some (defaultTablePath(" tbl9" ) + " -__PLACEHOLDER__" ),
183189 properties = Map (" path" -> tempDir.getAbsolutePath)),
184190 schema = new StructType (),
185191 properties = Map (" spark.sql.sources.provider" -> " json" ))
186192
187- // A list of all raw tables we want to test, with their expected schema and table location .
188- val rawTablesAndExpectations = Seq (
189- ( hiveTable, simpleSchema, None ) ,
190- ( externalHiveTable, simpleSchema, Some (tempDir.getCanonicalPath)) ,
191- ( partitionedHiveTable, partitionedSchema, None ) ,
192- ( dataSourceTable, simpleSchema, None ) ,
193- ( hiveCompatibleDataSourceTable, simpleSchema, None ) ,
194- ( partitionedDataSourceTable, partitionedSchema, None ) ,
195- ( externalDataSourceTable, simpleSchema, Some (tempDir.getCanonicalPath)) ,
196- ( hiveCompatibleExternalDataSourceTable, simpleSchema, Some (tempDir.getCanonicalPath)) ,
197- ( dataSourceTableWithoutSchema, new StructType (), None ))
193+ // A list of all raw tables we want to test, with their expected schema.
194+ lazy val rawTablesAndExpectations = Seq (
195+ hiveTable -> simpleSchema ,
196+ externalHiveTable -> simpleSchema ,
197+ partitionedHiveTable -> partitionedSchema ,
198+ dataSourceTable -> simpleSchema ,
199+ hiveCompatibleDataSourceTable -> simpleSchema ,
200+ partitionedDataSourceTable -> partitionedSchema ,
201+ externalDataSourceTable -> simpleSchema ,
202+ hiveCompatibleExternalDataSourceTable -> simpleSchema ,
203+ dataSourceTableWithoutSchema -> new StructType ())
198204
199205 test(" make sure we can read table created by old version of Spark" ) {
200- for ((tbl, expectedSchema, expectedLocation) <- rawTablesAndExpectations) {
201- val readBack = spark.sharedState.externalCatalog.getTable(
202- tbl.identifier.database.get, tbl.identifier.table)
203-
206+ for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
207+ val readBack = getTableMetadata(tbl.identifier.table)
204208 assert(readBack.schema == expectedSchema)
205- expectedLocation.foreach { loc =>
209+
210+ if (tbl.tableType == CatalogTableType .EXTERNAL ) {
206211 // trim the URI prefix
207212 val tableLocation = new URI (readBack.storage.locationUri.get).getPath
208- assert(tableLocation == loc)
213+ assert(tableLocation == tempDir.getAbsolutePath)
214+ }
215+ }
216+ }
217+
218+ test(" make sure we can alter table location created by old version of Spark" ) {
219+ withTempDir { dir =>
220+ for ((tbl, _) <- rawTablesAndExpectations if tbl.tableType == CatalogTableType .EXTERNAL ) {
221+ sql(s " ALTER TABLE ${tbl.identifier} SET LOCATION ' ${dir.getAbsolutePath}' " )
222+
223+ val readBack = getTableMetadata(tbl.identifier.table)
224+
225+ // trim the URI prefix
226+ val actualTableLocation = new URI (readBack.storage.locationUri.get).getPath
227+ assert(actualTableLocation == dir.getAbsolutePath)
228+ }
229+ }
230+ }
231+
232+ test(" make sure we can rename table created by old version of Spark" ) {
233+ for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
234+ val newName = tbl.identifier.table + " _renamed"
235+ sql(s " ALTER TABLE ${tbl.identifier} RENAME TO $newName" )
236+
237+ val readBack = getTableMetadata(newName)
238+ assert(readBack.schema == expectedSchema)
239+
240+ // trim the URI prefix
241+ val actualTableLocation = new URI (readBack.storage.locationUri.get).getPath
242+ val expectedLocation = if (tbl.tableType == CatalogTableType .EXTERNAL ) {
243+ tempDir.getAbsolutePath
244+ } else {
245+ // trim the URI prefix
246+ new URI (defaultTablePath(newName)).getPath
209247 }
248+ assert(actualTableLocation == expectedLocation)
210249 }
211250 }
212251}
0 commit comments