Skip to content

Commit 736c907

Browse files
committed
Remove Java 7 support:
- Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings
1 parent f776e3b commit 736c907

File tree

101 files changed

+513
-1186
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+513
-1186
lines changed

assembly/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@
187187
<plugin>
188188
<groupId>org.apache.maven.plugins</groupId>
189189
<artifactId>maven-assembly-plugin</artifactId>
190+
<version>3.0.0</version>
190191
<executions>
191192
<execution>
192193
<id>dist</id>

build/mvn

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
2222
# Preserve the calling directory
2323
_CALLING_DIR="$(pwd)"
2424
# Options used during compilation
25-
_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
25+
_COMPILE_JVM_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
2626

2727
# Installs any application tarball given a URL, the expected tarball name,
2828
# and, optionally, a checkable binary path to determine if the binary has
@@ -141,13 +141,9 @@ cd "${_CALLING_DIR}"
141141
# Now that zinc is ensured to be installed, check its status and, if its
142142
# not running or just installed, start it
143143
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
144-
ZINC_JAVA_HOME=
145-
if [ -n "$JAVA_7_HOME" ]; then
146-
ZINC_JAVA_HOME="env JAVA_HOME=$JAVA_7_HOME"
147-
fi
148144
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
149145
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
150-
$ZINC_JAVA_HOME "${ZINC_BIN}" -start -port ${ZINC_PORT} \
146+
"${ZINC_BIN}" -start -port ${ZINC_PORT} \
151147
-scala-compiler "${SCALA_COMPILER}" \
152148
-scala-library "${SCALA_LIBRARY}" &>/dev/null
153149
fi

build/sbt-launch-lib.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ get_mem_opts () {
117117
(( $perm < 4096 )) || perm=4096
118118
local codecache=$(( $perm / 2 ))
119119

120-
echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m"
120+
echo "-Xms${mem}m -Xmx${mem}m -XX:ReservedCodeCacheSize=${codecache}m"
121121
}
122122

123123
require_arg () {

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 49 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import com.google.common.base.Throwables;
3333
import com.google.common.util.concurrent.SettableFuture;
3434
import io.netty.channel.Channel;
35-
import io.netty.channel.ChannelFuture;
36-
import io.netty.channel.ChannelFutureListener;
3735
import org.slf4j.Logger;
3836
import org.slf4j.LoggerFactory;
3937

@@ -133,40 +131,36 @@ public void setClientId(String id) {
133131
*/
134132
public void fetchChunk(
135133
long streamId,
136-
final int chunkIndex,
137-
final ChunkReceivedCallback callback) {
138-
final long startTime = System.currentTimeMillis();
134+
int chunkIndex,
135+
ChunkReceivedCallback callback) {
136+
long startTime = System.currentTimeMillis();
139137
if (logger.isDebugEnabled()) {
140138
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
141139
}
142140

143-
final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
141+
StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
144142
handler.addFetchRequest(streamChunkId, callback);
145143

146-
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
147-
new ChannelFutureListener() {
148-
@Override
149-
public void operationComplete(ChannelFuture future) throws Exception {
150-
if (future.isSuccess()) {
151-
long timeTaken = System.currentTimeMillis() - startTime;
152-
if (logger.isTraceEnabled()) {
153-
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
154-
getRemoteAddress(channel), timeTaken);
155-
}
156-
} else {
157-
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
158-
getRemoteAddress(channel), future.cause());
159-
logger.error(errorMsg, future.cause());
160-
handler.removeFetchRequest(streamChunkId);
161-
channel.close();
162-
try {
163-
callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
164-
} catch (Exception e) {
165-
logger.error("Uncaught exception in RPC response callback handler!", e);
166-
}
167-
}
144+
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
145+
if (future.isSuccess()) {
146+
long timeTaken = System.currentTimeMillis() - startTime;
147+
if (logger.isTraceEnabled()) {
148+
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
149+
getRemoteAddress(channel), timeTaken);
168150
}
169-
});
151+
} else {
152+
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
153+
getRemoteAddress(channel), future.cause());
154+
logger.error(errorMsg, future.cause());
155+
handler.removeFetchRequest(streamChunkId);
156+
channel.close();
157+
try {
158+
callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
159+
} catch (Exception e) {
160+
logger.error("Uncaught exception in RPC response callback handler!", e);
161+
}
162+
}
163+
});
170164
}
171165

172166
/**
@@ -175,8 +169,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
175169
* @param streamId The stream to fetch.
176170
* @param callback Object to call with the stream data.
177171
*/
178-
public void stream(final String streamId, final StreamCallback callback) {
179-
final long startTime = System.currentTimeMillis();
172+
public void stream(String streamId, StreamCallback callback) {
173+
long startTime = System.currentTimeMillis();
180174
if (logger.isDebugEnabled()) {
181175
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
182176
}
@@ -186,29 +180,25 @@ public void stream(final String streamId, final StreamCallback callback) {
186180
// when responses arrive.
187181
synchronized (this) {
188182
handler.addStreamCallback(callback);
189-
channel.writeAndFlush(new StreamRequest(streamId)).addListener(
190-
new ChannelFutureListener() {
191-
@Override
192-
public void operationComplete(ChannelFuture future) throws Exception {
193-
if (future.isSuccess()) {
194-
long timeTaken = System.currentTimeMillis() - startTime;
195-
if (logger.isTraceEnabled()) {
196-
logger.trace("Sending request for {} to {} took {} ms", streamId,
197-
getRemoteAddress(channel), timeTaken);
198-
}
199-
} else {
200-
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
201-
getRemoteAddress(channel), future.cause());
202-
logger.error(errorMsg, future.cause());
203-
channel.close();
204-
try {
205-
callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
206-
} catch (Exception e) {
207-
logger.error("Uncaught exception in RPC response callback handler!", e);
208-
}
209-
}
183+
channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> {
184+
if (future.isSuccess()) {
185+
long timeTaken = System.currentTimeMillis() - startTime;
186+
if (logger.isTraceEnabled()) {
187+
logger.trace("Sending request for {} to {} took {} ms", streamId,
188+
getRemoteAddress(channel), timeTaken);
210189
}
211-
});
190+
} else {
191+
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
192+
getRemoteAddress(channel), future.cause());
193+
logger.error(errorMsg, future.cause());
194+
channel.close();
195+
try {
196+
callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
197+
} catch (Exception e) {
198+
logger.error("Uncaught exception in RPC response callback handler!", e);
199+
}
200+
}
201+
});
212202
}
213203
}
214204

@@ -220,19 +210,17 @@ public void operationComplete(ChannelFuture future) throws Exception {
220210
* @param callback Callback to handle the RPC's reply.
221211
* @return The RPC's id.
222212
*/
223-
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
224-
final long startTime = System.currentTimeMillis();
213+
public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
214+
long startTime = System.currentTimeMillis();
225215
if (logger.isTraceEnabled()) {
226216
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
227217
}
228218

229-
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
219+
long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
230220
handler.addRpcRequest(requestId, callback);
231221

232-
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
233-
new ChannelFutureListener() {
234-
@Override
235-
public void operationComplete(ChannelFuture future) throws Exception {
222+
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
223+
.addListener(future -> {
236224
if (future.isSuccess()) {
237225
long timeTaken = System.currentTimeMillis() - startTime;
238226
if (logger.isTraceEnabled()) {
@@ -251,8 +239,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
251239
logger.error("Uncaught exception in RPC response callback handler!", e);
252240
}
253241
}
254-
}
255-
});
242+
});
256243

257244
return requestId;
258245
}

common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,7 @@
2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
2222
import java.security.GeneralSecurityException;
23-
import java.security.Key;
24-
import javax.crypto.KeyGenerator;
25-
import javax.crypto.Mac;
26-
import static java.nio.charset.StandardCharsets.UTF_8;
2723

28-
import com.google.common.base.Preconditions;
2924
import com.google.common.base.Throwables;
3025
import io.netty.buffer.ByteBuf;
3126
import io.netty.buffer.Unpooled;
@@ -37,7 +32,6 @@
3732
import org.apache.spark.network.client.TransportClientBootstrap;
3833
import org.apache.spark.network.sasl.SaslClientBootstrap;
3934
import org.apache.spark.network.sasl.SecretKeyHolder;
40-
import org.apache.spark.network.util.JavaUtils;
4135
import org.apache.spark.network.util.TransportConf;
4236

4337
/**
@@ -103,20 +97,18 @@ public void doBootstrap(TransportClient client, Channel channel) {
10397
private void doSparkAuth(TransportClient client, Channel channel)
10498
throws GeneralSecurityException, IOException {
10599

106-
AuthEngine engine = new AuthEngine(authUser, secretKeyHolder.getSecretKey(authUser), conf);
107-
try {
100+
String secretKey = secretKeyHolder.getSecretKey(authUser);
101+
try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) {
108102
ClientChallenge challenge = engine.challenge();
109103
ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
110104
challenge.encode(challengeData);
111105

112-
ByteBuffer responseData = client.sendRpcSync(challengeData.nioBuffer(),
113-
conf.authRTTimeoutMs());
106+
ByteBuffer responseData =
107+
client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs());
114108
ServerResponse response = ServerResponse.decodeMessage(responseData);
115109

116110
engine.validate(response);
117111
engine.sessionCipher().addToChannel(channel);
118-
} finally {
119-
engine.close();
120112
}
121113
}
122114

common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.spark.network.crypto;
1919

20-
import java.io.IOException;
2120
import java.nio.ByteBuffer;
22-
import javax.security.sasl.Sasl;
2321

2422
import com.google.common.annotations.VisibleForTesting;
2523
import com.google.common.base.Throwables;
@@ -35,7 +33,6 @@
3533
import org.apache.spark.network.sasl.SaslRpcHandler;
3634
import org.apache.spark.network.server.RpcHandler;
3735
import org.apache.spark.network.server.StreamManager;
38-
import org.apache.spark.network.util.JavaUtils;
3936
import org.apache.spark.network.util.TransportConf;
4037

4138
/**

common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222

2323
import com.google.common.base.Throwables;
2424
import io.netty.channel.Channel;
25-
import io.netty.channel.ChannelFuture;
26-
import io.netty.channel.ChannelFutureListener;
2725
import org.slf4j.Logger;
2826
import org.slf4j.LoggerFactory;
2927

@@ -189,21 +187,16 @@ private void processOneWayMessage(OneWayMessage req) {
189187
* Responds to a single message with some Encodable object. If a failure occurs while sending,
190188
* it will be logged and the channel closed.
191189
*/
192-
private void respond(final Encodable result) {
193-
final SocketAddress remoteAddress = channel.remoteAddress();
194-
channel.writeAndFlush(result).addListener(
195-
new ChannelFutureListener() {
196-
@Override
197-
public void operationComplete(ChannelFuture future) throws Exception {
198-
if (future.isSuccess()) {
199-
logger.trace("Sent result {} to client {}", result, remoteAddress);
200-
} else {
201-
logger.error(String.format("Error sending result %s to %s; closing connection",
202-
result, remoteAddress), future.cause());
203-
channel.close();
204-
}
205-
}
190+
private void respond(Encodable result) {
191+
SocketAddress remoteAddress = channel.remoteAddress();
192+
channel.writeAndFlush(result).addListener(future -> {
193+
if (future.isSuccess()) {
194+
logger.trace("Sent result {} to client {}", result, remoteAddress);
195+
} else {
196+
logger.error(String.format("Error sending result %s to %s; closing connection",
197+
result, remoteAddress), future.cause());
198+
channel.close();
206199
}
207-
);
200+
});
208201
}
209202
}

common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
package org.apache.spark.network.crypto;
1919

2020
import java.util.Arrays;
21-
import java.util.Map;
2221
import static java.nio.charset.StandardCharsets.UTF_8;
2322

24-
import com.google.common.collect.ImmutableMap;
2523
import org.junit.BeforeClass;
2624
import org.junit.Test;
2725
import static org.junit.Assert.*;

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,8 @@ private ShuffleMetrics() {
190190
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
191191
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
192192
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
193-
allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() {
194-
@Override
195-
public Integer getValue() {
196-
return blockManager.getRegisteredExecutorsSize();
197-
}
198-
});
193+
allMetrics.put("registeredExecutorsSize",
194+
(Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
199195
}
200196

201197
@Override

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,7 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
205205
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
206206

207207
// Execute the actual deletion in a different thread, as it may take some time.
208-
directoryCleaner.execute(new Runnable() {
209-
@Override
210-
public void run() {
211-
deleteExecutorDirs(executor.localDirs);
212-
}
213-
});
208+
directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs));
214209
}
215210
}
216211
}

0 commit comments

Comments
 (0)