@@ -201,63 +201,67 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
201201 var defaultV1 : String = null
202202 var defaultV2 : String = null
203203
204- withMultipleConnectionJdbcStatement(Seq (
204+ withMultipleConnectionJdbcStatement(
205205 // create table
206206 { statement =>
207- val queries = Seq (
208- " DROP TABLE IF EXISTS test_map" ,
209- " CREATE TABLE test_map(key INT, value STRING)" ,
210- s " LOAD DATA LOCAL INPATH ' ${TestData .smallKv}' OVERWRITE INTO TABLE test_map " ,
211- " CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC" )
212-
213- queries.foreach(statement.execute)
214-
215- val rs1 = statement.executeQuery(" SELECT key FROM test_table ORDER BY KEY DESC" )
216- val buf1 = new collection.mutable.ArrayBuffer [Int ]()
217- while (rs1.next()) {
218- buf1 += rs1.getInt(1 )
219- }
220- rs1.close()
221-
222- val rs2 = statement.executeQuery(" SELECT key FROM test_map ORDER BY KEY DESC" )
223- val buf2 = new collection.mutable.ArrayBuffer [Int ]()
224- while (rs2.next()) {
225- buf2 += rs2.getInt(1 )
226- }
227- rs2.close()
228-
229- assert(buf1 === buf2)
230- },
207+
208+ val queries = Seq (
209+ " DROP TABLE IF EXISTS test_map" ,
210+ " CREATE TABLE test_map(key INT, value STRING)" ,
211+ s " LOAD DATA LOCAL INPATH ' ${TestData .smallKv}' OVERWRITE INTO TABLE test_map " ,
212+ " CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC" )
213+
214+ queries.foreach(statement.execute)
215+
216+ val rs1 = statement.executeQuery(" SELECT key FROM test_table ORDER BY KEY DESC" )
217+ val buf1 = new collection.mutable.ArrayBuffer [Int ]()
218+ while (rs1.next()) {
219+ buf1 += rs1.getInt(1 )
220+ }
221+ rs1.close()
222+
223+ val rs2 = statement.executeQuery(" SELECT key FROM test_map ORDER BY KEY DESC" )
224+ val buf2 = new collection.mutable.ArrayBuffer [Int ]()
225+ while (rs2.next()) {
226+ buf2 += rs2.getInt(1 )
227+ }
228+ rs2.close()
229+
230+ assert(buf1 === buf2)
231+ },
231232
232233 // first session, we get the default value of the session status
233234 { statement =>
235+
234236 val rs1 = statement.executeQuery(s " SET ${SQLConf .SHUFFLE_PARTITIONS }" )
235237 rs1.next()
236238 defaultV1 = rs1.getString(1 )
237239 assert(defaultV1 != " 200" )
238240 rs1.close()
239241
240- val rs2 = statement.executeQuery(" SET hive.cli.print.header" )
242+ val rs2 = statement.executeQuery(" SET hive.cli.print.header" )
241243 rs2.next()
244+
242245 defaultV2 = rs2.getString(1 )
243246 assert(defaultV1 != " true" )
244247 rs2.close()
245248 },
246249
247250 // second session, we update the session status
248251 { statement =>
252+
249253 val queries = Seq (
250254 s " SET ${SQLConf .SHUFFLE_PARTITIONS }=291 " ,
251255 " SET hive.cli.print.header=true"
252256 )
253257
254- queries.map(statement.execute)
258+ queries.map(statement.execute)
255259 val rs1 = statement.executeQuery(s " SET ${SQLConf .SHUFFLE_PARTITIONS }" )
256260 rs1.next()
257261 assert(" spark.sql.shuffle.partitions=291" === rs1.getString(1 ))
258262 rs1.close()
259263
260- val rs2 = statement.executeQuery(" SET hive.cli.print.header" )
264+ val rs2 = statement.executeQuery(" SET hive.cli.print.header" )
261265 rs2.next()
262266 assert(" hive.cli.print.header=true" === rs2.getString(1 ))
263267 rs2.close()
@@ -266,67 +270,71 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
266270 // third session, we get the latest session status, supposed to be the
267271 // default value
268272 { statement =>
269- val rs1 = statement.executeQuery(s " SET ${SQLConf .SHUFFLE_PARTITIONS }" )
270- rs1.next()
271- assert(defaultV1 === rs1.getString(1 ))
272- rs1.close()
273273
274- val rs2 = statement.executeQuery(" SET hive.cli.print.header" )
275- rs2.next()
276- assert(defaultV2 === rs2.getString(1 ))
277- rs2.close()
278- },
274+ val rs1 = statement.executeQuery(s " SET ${SQLConf .SHUFFLE_PARTITIONS }" )
275+ rs1.next()
276+ assert(defaultV1 === rs1.getString(1 ))
277+ rs1.close()
278+
279+ val rs2 = statement.executeQuery(" SET hive.cli.print.header" )
280+ rs2.next()
281+ assert(defaultV2 === rs2.getString(1 ))
282+ rs2.close()
283+ },
279284
280285 // accessing the cached data in another session
281286 { statement =>
287+
282288 val rs1 = statement.executeQuery(" SELECT key FROM test_table ORDER BY KEY DESC" )
283289 val buf1 = new collection.mutable.ArrayBuffer [Int ]()
284290 while (rs1.next()) {
285- buf1 += rs1.getInt(1 )
286- }
291+ buf1 += rs1.getInt(1 )
292+ }
287293 rs1.close()
288294
289- val rs2 = statement.executeQuery(" SELECT key FROM test_map ORDER BY KEY DESC" )
295+ val rs2 = statement.executeQuery(" SELECT key FROM test_map ORDER BY KEY DESC" )
290296 val buf2 = new collection.mutable.ArrayBuffer [Int ]()
291297 while (rs2.next()) {
292- buf2 += rs2.getInt(1 )
293- }
298+ buf2 += rs2.getInt(1 )
299+ }
294300 rs2.close()
295301
296- assert(buf1 === buf2)
302+ assert(buf1 === buf2)
297303 statement.executeQuery(" UNCACHE TABLE test_table" )
298304
299- // TODO need to figure out how to determine if the data loaded from cache
305+ // TODO need to figure out how to determine if the data loaded from cache
300306 val rs3 = statement.executeQuery(" SELECT key FROM test_map ORDER BY KEY DESC" )
301307 val buf3 = new collection.mutable.ArrayBuffer [Int ]()
302308 while (rs3.next()) {
303- buf3 += rs3.getInt(1 )
304- }
309+ buf3 += rs3.getInt(1 )
310+ }
305311 rs3.close()
306312
307- assert(buf1 === buf3)
313+ assert(buf1 === buf3)
308314 },
309315
310316 // accessing the uncached table
311317 { statement =>
318+
312319 // TODO need to figure out how to determine if the data loaded from cache
313- val rs1 = statement.executeQuery(" SELECT key FROM test_table ORDER BY KEY DESC" )
320+ val rs1 = statement.executeQuery(" SELECT key FROM test_table ORDER BY KEY DESC" )
314321 val buf1 = new collection.mutable.ArrayBuffer [Int ]()
315322 while (rs1.next()) {
316- buf1 += rs1.getInt(1 )
317- }
323+ buf1 += rs1.getInt(1 )
324+ }
318325 rs1.close()
319326
320- val rs2 = statement.executeQuery(" SELECT key FROM test_map ORDER BY KEY DESC" )
327+ val rs2 = statement.executeQuery(" SELECT key FROM test_map ORDER BY KEY DESC" )
321328 val buf2 = new collection.mutable.ArrayBuffer [Int ]()
322329 while (rs2.next()) {
323- buf2 += rs2.getInt(1 )
324- }
330+ buf2 += rs2.getInt(1 )
331+ }
325332 rs2.close()
326333
327- assert(buf1 === buf2)
328- }))
329- }
334+ assert(buf1 === buf2)
335+ }
336+ )
337+ }
330338}
331339
332340class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
@@ -377,7 +385,7 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
377385 s " jdbc:hive2://localhost: $serverPort/ "
378386 }
379387
380- def withMultipleConnectionJdbcStatement (fs : Seq [ Statement => Unit ] ) {
388+ def withMultipleConnectionJdbcStatement (fs : ( Statement => Unit ) * ) {
381389 val user = System .getProperty(" user.name" )
382390 val connections = fs.map { _ => DriverManager .getConnection(jdbcUri, user, " " ) }
383391 val statements = connections.map(_.createStatement())
@@ -391,7 +399,7 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
391399 }
392400
393401 def withJdbcStatement (f : Statement => Unit ) {
394- withMultipleConnectionJdbcStatement(Seq (f) )
402+ withMultipleConnectionJdbcStatement(f )
395403 }
396404}
397405
0 commit comments