Skip to content

Commit 64bc339

Browse files
committed
Merge remote-tracking branch 'upstream/master' into svclbfgs
2 parents 0bb5afe + 57accf6 commit 64bc339

File tree

130 files changed

+3014
-885
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

130 files changed

+3014
-885
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,15 @@ public void fetchBlocks(
9191
String execId,
9292
String[] blockIds,
9393
BlockFetchingListener listener,
94-
TempShuffleFileManager tempShuffleFileManager) {
94+
TempFileManager tempFileManager) {
9595
checkInit();
9696
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
9797
try {
9898
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
9999
(blockIds1, listener1) -> {
100100
TransportClient client = clientFactory.createClient(host, port);
101101
new OneForOneBlockFetcher(client, appId, execId,
102-
blockIds1, listener1, conf, tempShuffleFileManager).start();
102+
blockIds1, listener1, conf, tempFileManager).start();
103103
};
104104

105105
int maxRetries = conf.maxIORetries();

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class OneForOneBlockFetcher {
5858
private final BlockFetchingListener listener;
5959
private final ChunkReceivedCallback chunkCallback;
6060
private final TransportConf transportConf;
61-
private final TempShuffleFileManager tempShuffleFileManager;
61+
private final TempFileManager tempFileManager;
6262

6363
private StreamHandle streamHandle = null;
6464

@@ -79,14 +79,14 @@ public OneForOneBlockFetcher(
7979
String[] blockIds,
8080
BlockFetchingListener listener,
8181
TransportConf transportConf,
82-
TempShuffleFileManager tempShuffleFileManager) {
82+
TempFileManager tempFileManager) {
8383
this.client = client;
8484
this.openMessage = new OpenBlocks(appId, execId, blockIds);
8585
this.blockIds = blockIds;
8686
this.listener = listener;
8787
this.chunkCallback = new ChunkCallback();
8888
this.transportConf = transportConf;
89-
this.tempShuffleFileManager = tempShuffleFileManager;
89+
this.tempFileManager = tempFileManager;
9090
}
9191

9292
/** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
@@ -125,7 +125,7 @@ public void onSuccess(ByteBuffer response) {
125125
// Immediately request all chunks -- we expect that the total size of the request is
126126
// reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
127127
for (int i = 0; i < streamHandle.numChunks; i++) {
128-
if (tempShuffleFileManager != null) {
128+
if (tempFileManager != null) {
129129
client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
130130
new DownloadCallback(i));
131131
} else {
@@ -164,7 +164,7 @@ private class DownloadCallback implements StreamCallback {
164164
private int chunkIndex;
165165

166166
DownloadCallback(int chunkIndex) throws IOException {
167-
this.targetFile = tempShuffleFileManager.createTempShuffleFile();
167+
this.targetFile = tempFileManager.createTempFile();
168168
this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath()));
169169
this.chunkIndex = chunkIndex;
170170
}
@@ -180,7 +180,7 @@ public void onComplete(String streamId) throws IOException {
180180
ManagedBuffer buffer = new FileSegmentManagedBuffer(transportConf, targetFile, 0,
181181
targetFile.length());
182182
listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
183-
if (!tempShuffleFileManager.registerTempShuffleFileToClean(targetFile)) {
183+
if (!tempFileManager.registerTempFileToClean(targetFile)) {
184184
targetFile.delete();
185185
}
186186
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,18 @@ public void init(String appId) { }
4343
* @param execId the executor id.
4444
* @param blockIds block ids to fetch.
4545
* @param listener the listener to receive block fetching status.
46-
* @param tempShuffleFileManager TempShuffleFileManager to create and clean temp shuffle files.
47-
* If it's not <code>null</code>, the remote blocks will be streamed
48-
* into temp shuffle files to reduce the memory usage, otherwise,
49-
* they will be kept in memory.
46+
* @param tempFileManager TempFileManager to create and clean temp files.
47+
* If it's not <code>null</code>, the remote blocks will be streamed
48+
* into temp shuffle files to reduce the memory usage, otherwise,
49+
* they will be kept in memory.
5050
*/
5151
public abstract void fetchBlocks(
5252
String host,
5353
int port,
5454
String execId,
5555
String[] blockIds,
5656
BlockFetchingListener listener,
57-
TempShuffleFileManager tempShuffleFileManager);
57+
TempFileManager tempFileManager);
5858

5959
/**
6060
* Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java renamed to common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@
2020
import java.io.File;
2121

2222
/**
23-
* A manager to create temp shuffle block files to reduce the memory usage and also clean temp
23+
* A manager to create temp block files to reduce the memory usage and also clean temp
2424
* files when they won't be used any more.
2525
*/
26-
public interface TempShuffleFileManager {
26+
public interface TempFileManager {
2727

28-
/** Create a temp shuffle block file. */
29-
File createTempShuffleFile();
28+
/** Create a temp block file. */
29+
File createTempFile();
3030

3131
/**
32-
* Register a temp shuffle file to clean up when it won't be used any more. Return whether the
32+
* Register a temp file to clean up when it won't be used any more. Return whether the
3333
* file is registered successfully. If `false`, the caller should clean up the file by itself.
3434
*/
35-
boolean registerTempShuffleFileToClean(File file);
35+
boolean registerTempFileToClean(File file);
3636
}

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,11 @@ public void free() {
172172
public void reset() {
173173
if (consumer != null) {
174174
consumer.freeArray(array);
175-
// the call to consumer.allocateArray may trigger a spill
176-
// which in turn access this instance and eventually re-enter this method and try to free the array again.
177-
// by setting the array to null and its length to 0 we effectively make the spill code-path a no-op.
178-
// setting the array to null also indicates that it has already been de-allocated which prevents a double de-allocation in free().
175+
// the call to consumer.allocateArray may trigger a spill which in turn access this instance
176+
// and eventually re-enter this method and try to free the array again. by setting the array
177+
// to null and its length to 0 we effectively make the spill code-path a no-op. setting the
178+
// array to null also indicates that it has already been de-allocated which prevents a double
179+
// de-allocation in free().
179180
array = null;
180181
usableCapacity = 0;
181182
pos = 0;

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,9 @@ private[spark] object SparkConf extends Logging {
662662
"spark.yarn.jars" -> Seq(
663663
AlternateConfig("spark.yarn.jar", "2.0")),
664664
"spark.yarn.access.hadoopFileSystems" -> Seq(
665-
AlternateConfig("spark.yarn.access.namenodes", "2.2"))
665+
AlternateConfig("spark.yarn.access.namenodes", "2.2")),
666+
"spark.maxRemoteBlockSizeFetchToMem" -> Seq(
667+
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3"))
666668
)
667669

668670
/**

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ private[spark] object PythonEvalType {
3636
val NON_UDF = 0
3737
val SQL_BATCHED_UDF = 1
3838
val SQL_PANDAS_UDF = 2
39+
val SQL_PANDAS_GROUPED_UDF = 3
3940
}
4041

4142
/**

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,17 @@ class SparkHadoopUtil extends Logging {
6161
* do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
6262
*/
6363
def runAsSparkUser(func: () => Unit) {
64+
createSparkUser().doAs(new PrivilegedExceptionAction[Unit] {
65+
def run: Unit = func()
66+
})
67+
}
68+
69+
def createSparkUser(): UserGroupInformation = {
6470
val user = Utils.getCurrentUserName()
65-
logDebug("running as user: " + user)
71+
logDebug("creating UGI for user: " + user)
6672
val ugi = UserGroupInformation.createRemoteUser(user)
6773
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
68-
ugi.doAs(new PrivilegedExceptionAction[Unit] {
69-
def run: Unit = func()
70-
})
74+
ugi
7175
}
7276

7377
def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
@@ -417,6 +421,11 @@ class SparkHadoopUtil extends Logging {
417421
creds.readTokenStorageStream(new DataInputStream(tokensBuf))
418422
creds
419423
}
424+
425+
def isProxyUser(ugi: UserGroupInformation): Boolean = {
426+
ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY
427+
}
428+
420429
}
421430

422431
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,22 @@ object SparkSubmit extends CommandLineUtils with Logging {
342342
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
343343
val targetDir = Utils.createTempDir()
344344

345+
// assure a keytab is available from any place in a JVM
346+
if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
347+
if (args.principal != null) {
348+
if (args.keytab != null) {
349+
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
350+
// Add keytab and principal configurations in sysProps to make them available
351+
// for later use; e.g. in spark sql, the isolated class loader used to talk
352+
// to HiveMetastore will use these settings. They will be set as Java system
353+
// properties and then loaded by SparkConf
354+
sysProps.put("spark.yarn.keytab", args.keytab)
355+
sysProps.put("spark.yarn.principal", args.principal)
356+
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
357+
}
358+
}
359+
}
360+
345361
// Resolve glob path for different resources.
346362
args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
347363
args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
@@ -641,22 +657,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
641657
}
642658
}
643659

644-
// assure a keytab is available from any place in a JVM
645-
if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
646-
if (args.principal != null) {
647-
if (args.keytab != null) {
648-
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
649-
// Add keytab and principal configurations in sysProps to make them available
650-
// for later use; e.g. in spark sql, the isolated class loader used to talk
651-
// to HiveMetastore will use these settings. They will be set as Java system
652-
// properties and then loaded by SparkConf
653-
sysProps.put("spark.yarn.keytab", args.keytab)
654-
sysProps.put("spark.yarn.principal", args.principal)
655-
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
656-
}
657-
}
658-
}
659-
660660
if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
661661
setRMPrincipal(sysProps)
662662
}

core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ private[security] class HBaseDelegationTokenProvider
5656
None
5757
}
5858

59-
override def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
59+
override def delegationTokensRequired(
60+
sparkConf: SparkConf,
61+
hadoopConf: Configuration): Boolean = {
6062
hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
6163
}
6264

0 commit comments

Comments
 (0)