File tree Expand file tree Collapse file tree 2 files changed +6
-9
lines changed
main/scala/org/apache/spark/deploy
test/scala/org/apache/spark/metrics Expand file tree Collapse file tree 2 files changed +6
-9
lines changed Original file line number Diff line number Diff line change @@ -149,16 +149,15 @@ class SparkHadoopUtil extends Logging {
149149 val f = () => FileSystem .getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum
150150 val baseline = (Thread .currentThread().getId, f())
151151
152+ /**
153+ * This function may be called in both spawned child threads and parent task thread (in
154+ * PythonRDD), and Hadoop FileSystem uses thread local variables to track the statistics.
155+ * So we need a map to track the bytes read from the child threads and parent thread,
156+ * summing them together to get the bytes read of this task.
157+ */
152158 new Function0 [Long ] {
153159 private val bytesReadMap = new mutable.HashMap [Long , Long ]()
154160
155- /**
156- * Returns a function that can be called to calculate Hadoop FileSystem bytes read.
157- * This function may be called in both spawned child threads and parent task thread (in
158- * PythonRDD), and Hadoop FileSystem uses thread local variables to track the statistics.
159- * So we need a map to track the bytes read from the child threads and parent thread,
160- * summing them together to get the bytes read of this task.
161- */
162161 override def apply (): Long = {
163162 bytesReadMap.synchronized {
164163 bytesReadMap.put(Thread .currentThread().getId, f())
Original file line number Diff line number Diff line change @@ -331,7 +331,6 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
331331 buf.iterator
332332 }.count()
333333 }
334- assert(bytesRead != 0 )
335334 assert(bytesRead >= tmpFile.length())
336335 }
337336
@@ -347,7 +346,6 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
347346 buf.iterator
348347 }.count()
349348 }
350- assert(bytesRead != 0 )
351349 assert(bytesRead >= tmpFile.length())
352350 }
353351}
You can’t perform that action at this time.
0 commit comments