Skip to content

Commit 9447011

Browse files
committed
Further address the comments
Change-Id: I38a5491e555496004204bf99634f7147dac6c642
1 parent 1d1440b commit 9447011

File tree

3 files changed

+48
-38
lines changed

3 files changed

+48
-38
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import scala.util.control.NonFatal
2828
import com.google.common.primitives.Longs
2929
import org.apache.hadoop.conf.Configuration
3030
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
31+
import org.apache.hadoop.fs.permission.FsAction
3132
import org.apache.hadoop.mapred.JobConf
3233
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3334
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -353,6 +354,28 @@ class SparkHadoopUtil extends Logging {
353354
}
354355
buffer.toString
355356
}
357+
358+
private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
359+
val perm = status.getPermission
360+
val ugi = UserGroupInformation.getCurrentUser
361+
362+
if (ugi.getShortUserName == status.getOwner) {
363+
if (perm.getUserAction.implies(mode)) {
364+
return true
365+
}
366+
} else if (ugi.getGroupNames.contains(status.getGroup)) {
367+
if (perm.getGroupAction.implies(mode)) {
368+
return true
369+
}
370+
} else if (perm.getOtherAction.implies(mode)) {
371+
return true
372+
}
373+
374+
logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
375+
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
376+
s"${if (status.isDirectory) "d" else "-"}$perm")
377+
false
378+
}
356379
}
357380

358381
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
3131
import org.apache.hadoop.fs.permission.FsAction
3232
import org.apache.hadoop.hdfs.DistributedFileSystem
3333
import org.apache.hadoop.hdfs.protocol.HdfsConstants
34-
import org.apache.hadoop.security.{AccessControlException, UserGroupInformation}
34+
import org.apache.hadoop.security.AccessControlException
3535

3636
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
3737
import org.apache.spark.deploy.SparkHadoopUtil
@@ -319,42 +319,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
319319
// scan for modified applications, replay and merge them
320320
val logInfos: Seq[FileStatus] = statusList
321321
.filter { entry =>
322-
try {
323-
val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
324-
325-
def canAccess = {
326-
val perm = entry.getPermission
327-
val ugi = UserGroupInformation.getCurrentUser
328-
val user = ugi.getShortUserName
329-
val groups = ugi.getGroupNames
330-
if (user == entry.getOwner && perm.getUserAction.implies(FsAction.READ)) {
331-
true
332-
} else if (groups.contains(entry.getGroup) &&
333-
perm.getGroupAction.implies(FsAction.READ)) {
334-
true
335-
} else if (perm.getOtherAction.implies(FsAction.READ)) {
336-
true
337-
} else {
338-
throw new AccessControlException(s"Permission denied: user=$user, " +
339-
s"path=${entry.getPath}:${entry.getOwner}:${entry.getGroup}" +
340-
s"${if (entry.isDirectory) "d" else "-"}$perm")
341-
}
342-
}
343-
344-
!entry.isDirectory() &&
345-
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
346-
// reading a garbage file is safe, but we would log an error which can be scary to
347-
// the end-user.
348-
!entry.getPath().getName().startsWith(".") &&
349-
prevFileSize < entry.getLen() &&
350-
canAccess
351-
} catch {
352-
case _: AccessControlException =>
353-
// Do not use "logInfo" since these messages can get pretty noisy if printed on
354-
// every poll.
355-
logDebug(s"No permission to read $entry, ignoring.")
356-
false
357-
}
322+
val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
323+
!entry.isDirectory() &&
324+
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
325+
// reading a garbage file is safe, but we would log an error which can be scary to
326+
// the end-user.
327+
!entry.getPath().getName().startsWith(".") &&
328+
prevFileSize < entry.getLen() &&
329+
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
358330
}
359331
.flatMap { entry => Some(entry) }
360332
.sortWith { case (entry1, entry2) =>

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import scala.concurrent.duration._
2727
import scala.language.postfixOps
2828

2929
import com.google.common.io.{ByteStreams, Files}
30-
import org.apache.hadoop.fs.FileStatus
30+
import org.apache.hadoop.fs.{FileStatus, Path}
31+
import org.apache.hadoop.fs.permission.FsAction
3132
import org.apache.hadoop.hdfs.DistributedFileSystem
3233
import org.json4s.jackson.JsonMethods._
3334
import org.mockito.Matchers.any
@@ -37,6 +38,7 @@ import org.scalatest.Matchers
3738
import org.scalatest.concurrent.Eventually._
3839

3940
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
41+
import org.apache.spark.deploy.SparkHadoopUtil
4042
import org.apache.spark.internal.Logging
4143
import org.apache.spark.io._
4244
import org.apache.spark.scheduler._
@@ -154,7 +156,20 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
154156
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
155157
SparkListenerApplicationEnd(2L)
156158
)
159+
160+
val path = new Path(logFile2.toURI)
161+
val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
162+
val status = fs.getFileStatus(path)
163+
SparkHadoopUtil.get.checkAccessPermission(status, FsAction.READ) should be (true)
164+
157165
logFile2.setReadable(false, false)
166+
val status1 = fs.getFileStatus(path)
167+
SparkHadoopUtil.get.checkAccessPermission(status1, FsAction.READ) should be (false)
168+
169+
logFile2.setReadable(false, true)
170+
val status2 = fs.getFileStatus(path)
171+
SparkHadoopUtil.get.checkAccessPermission(status2, FsAction.READ) should be (false)
172+
158173

159174
updateAndCheck(provider) { list =>
160175
list.size should be (1)

0 commit comments

Comments
 (0)