Skip to content

Commit 38444ea

Browse files
committed
Restore the workaround
1 parent 14a199c commit 38444ea

File tree

9 files changed

+59
-24
lines changed

9 files changed

+59
-24
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
5555
}
5656

5757

58-
test("OffsetSeqLog serialization - deserialization") {
58+
testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") {
5959
withTempDir { temp =>
6060
// use non-existent directory to test whether log make the dir
6161
val dir = new File(temp, "dir")

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
109109
override def add(batchId: Long, metadata: T): Boolean = {
110110
get(batchId).map(_ => false).getOrElse {
111111
// Only write metadata when the batch has not yet been written
112-
writeBatch(batchId, metadata)
112+
if (fileManager.isLocalFileSystem) {
113+
Thread.currentThread match {
114+
case ut: UninterruptibleThread =>
115+
// When using a local file system, "writeBatch" must be called on a
116+
// [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
117+
// while writing the batch file.
118+
//
119+
// This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084).
120+
// If the user tries to stop a query, and the thread running "Shell.runCommand" is
121+
// interrupted, then InterruptException will be dropped and the query will be still
122+
// running. (Note: `writeBatch` creates a file using HDFS APIs and will call
123+
// "Shell.runCommand" to set the file permission if using the local file system)
124+
//
125+
// Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which
126+
// allows us to disable interrupts here, in order to propagate the interrupt state
127+
// correctly. Also see SPARK-19599.
128+
ut.runUninterruptibly { writeBatch(batchId, metadata) }
129+
case _ =>
130+
throw new IllegalStateException(
131+
"HDFSMetadataLog.add() on a local file system must be executed on " +
132+
"a o.a.spark.util.UninterruptibleThread")
133+
}
134+
} else {
135+
// For a distributed file system, such as HDFS or S3, if the network is broken, write
136+
// operations may just hang until timeout. We should enable interrupts to allow stopping
137+
// the query fast.
138+
writeBatch(batchId, metadata)
139+
}
113140
true
114141
}
115142
}
@@ -300,6 +327,9 @@ object HDFSMetadataLog {
300327

301328
/** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
302329
def delete(path: Path): Unit
330+
331+
/** Whether the file systme is a local FS. */
332+
def isLocalFileSystem: Boolean
303333
}
304334

305335
/**
@@ -344,6 +374,13 @@ object HDFSMetadataLog {
344374
// ignore if file has already been deleted
345375
}
346376
}
377+
378+
override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match {
379+
case _: local.LocalFs | _: local.RawLocalFs =>
380+
// LocalFs = RawLocalFs + ChecksumFs
381+
true
382+
case _ => false
383+
}
347384
}
348385

349386
/**
@@ -400,5 +437,12 @@ object HDFSMetadataLog {
400437
// ignore if file has already been deleted
401438
}
402439
}
440+
441+
override def isLocalFileSystem: Boolean = fs match {
442+
case _: LocalFileSystem | _: RawLocalFileSystem =>
443+
// LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem
444+
true
445+
case _ => false
446+
}
403447
}
404448
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,8 @@ class StreamExecution(
179179

180180
/**
181181
* The thread that runs the micro-batches of this stream. Note that this thread must be
182-
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential endless loop in
183-
* `KafkaConsumer`. See KAFKA-1894 for more details.
182+
* [[org.apache.spark.util.UninterruptibleThread]] to avoid swallowing `InterruptException` when
183+
* using [[HDFSMetadataLog]]. See SPARK-19599 for more details.
184184
*/
185185
val microBatchThread =
186186
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
156156
})
157157
}
158158

159-
test("compact") {
159+
testWithUninterruptibleThread("compact") {
160160
withFakeCompactibleFileStreamLog(
161161
fileCleanupDelayMs = Long.MaxValue,
162162
defaultCompactInterval = 3,
@@ -174,7 +174,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
174174
})
175175
}
176176

177-
test("delete expired file") {
177+
testWithUninterruptibleThread("delete expired file") {
178178
// Set `fileCleanupDelayMs` to 0 so that we can detect the deleting behaviour deterministically
179179
withFakeCompactibleFileStreamLog(
180180
fileCleanupDelayMs = 0,

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
129129
}
130130
}
131131

132-
test("compact") {
132+
testWithUninterruptibleThread("compact") {
133133
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
134134
withFileStreamSinkLog { sinkLog =>
135135
for (batchId <- 0 to 10) {
@@ -149,7 +149,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
149149
}
150150
}
151151

152-
test("delete expired file") {
152+
testWithUninterruptibleThread("delete expired file") {
153153
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
154154
// deterministically and one min batches to retain
155155
withSQLConf(

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
5757
}
5858
}
5959

60-
test("HDFSMetadataLog: basic") {
60+
testWithUninterruptibleThread("HDFSMetadataLog: basic") {
6161
withTempDir { temp =>
6262
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
6363
val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
@@ -82,7 +82,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
8282
}
8383
}
8484

85-
testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
85+
testWithUninterruptibleThread(
86+
"HDFSMetadataLog: fallback from FileContext to FileSystem", quietly = true) {
8687
spark.conf.set(
8788
s"fs.$scheme.impl",
8889
classOf[FakeFileSystem].getName)
@@ -102,7 +103,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
102103
}
103104
}
104105

105-
test("HDFSMetadataLog: purge") {
106+
testWithUninterruptibleThread("HDFSMetadataLog: purge") {
106107
withTempDir { temp =>
107108
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
108109
assert(metadataLog.add(0, "batch0"))
@@ -127,7 +128,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
127128
}
128129
}
129130

130-
test("HDFSMetadataLog: restart") {
131+
testWithUninterruptibleThread("HDFSMetadataLog: restart") {
131132
withTempDir { temp =>
132133
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
133134
assert(metadataLog.add(0, "batch0"))

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
3636
OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
3737
}
3838

39-
test("OffsetSeqLog - serialization - deserialization") {
39+
testWithUninterruptibleThread("OffsetSeqLog - serialization - deserialization") {
4040
withTempDir { temp =>
4141
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
4242
val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
11741174
assert(map.isNewFile("b", 10))
11751175
}
11761176

1177-
test("do not recheck that files exist during getBatch") {
1177+
testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
11781178
withTempDir { temp =>
11791179
spark.conf.set(
11801180
s"fs.$scheme.impl",

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,6 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
4949
}
5050

5151
after {
52-
val activeQueries = spark.streams.active
53-
if (activeQueries.nonEmpty) {
54-
for (query <- activeQueries) {
55-
val stackTrace =
56-
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.microBatchThread.getStackTrace
57-
// scalastyle:off
58-
println(stackTrace.mkString("\n"))
59-
// scalastyle:on
60-
}
61-
}
6252
assert(spark.streams.active.isEmpty)
6353
spark.streams.resetTerminated()
6454
}

0 commit comments

Comments
 (0)