Skip to content

Commit 507b00c

Browse files
committed
Merge remote-tracking branch 'upstream/master' into remove_analyzed_from_create_temp_view
2 parents a14b3b9 + 7838f55 commit 507b00c

File tree

65 files changed

+3705
-937
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+3705
-937
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,18 @@ private[spark] class ContextCleaner(
172172
registerForCleanup(rdd, CleanCheckpoint(parentId))
173173
}
174174

175-
/** Register an object for cleanup. */
176-
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
177-
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
178-
}
179-
180175
/** Register a SparkListener to be cleaned up when its owner is garbage collected. */
181176
def registerSparkListenerForCleanup(
182177
listenerOwner: AnyRef,
183178
listener: SparkListener): Unit = {
184179
registerForCleanup(listenerOwner, CleanSparkListener(listener))
185180
}
186181

182+
/** Register an object for cleanup. */
183+
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
184+
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
185+
}
186+
187187
/** Keep cleaning RDD, shuffle, and broadcast state. */
188188
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
189189
while (!stopped) {

core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ private[deploy] class HadoopFSDelegationTokenProvider
4848
creds: Credentials): Option[Long] = {
4949
try {
5050
val fileSystems = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
51-
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds)
51+
// The hosts on which the file systems to be excluded from token renewal
52+
val fsToExclude = sparkConf.get(YARN_KERBEROS_FILESYSTEM_RENEWAL_EXCLUDE)
53+
.map(new Path(_).getFileSystem(hadoopConf).getUri.getHost)
54+
.toSet
55+
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds,
56+
fsToExclude)
5257

5358
// Get the token renewal interval if it is not set. It will only be called once.
5459
if (tokenRenewalInterval == null) {
@@ -99,11 +104,18 @@ private[deploy] class HadoopFSDelegationTokenProvider
99104
private def fetchDelegationTokens(
100105
renewer: String,
101106
filesystems: Set[FileSystem],
102-
creds: Credentials): Credentials = {
107+
creds: Credentials,
108+
fsToExclude: Set[String]): Credentials = {
103109

104110
filesystems.foreach { fs =>
105-
logInfo(s"getting token for: $fs with renewer $renewer")
106-
fs.addDelegationTokens(renewer, creds)
111+
if (fsToExclude.contains(fs.getUri.getHost)) {
112+
// YARN RM skips renewing token with empty renewer
113+
logInfo(s"getting token for: $fs with empty renewer to skip renewal")
114+
fs.addDelegationTokens("", creds)
115+
} else {
116+
logInfo(s"getting token for: $fs with renewer $renewer")
117+
fs.addDelegationTokens(renewer, creds)
118+
}
107119
}
108120

109121
creds
@@ -119,7 +131,7 @@ private[deploy] class HadoopFSDelegationTokenProvider
119131
val renewer = UserGroupInformation.getCurrentUser().getUserName()
120132

121133
val creds = new Credentials()
122-
fetchDelegationTokens(renewer, filesystems, creds)
134+
fetchDelegationTokens(renewer, filesystems, creds, Set.empty)
123135

124136
val renewIntervals = creds.getAllTokens.asScala.filter {
125137
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,18 @@ package object config {
716716
.toSequence
717717
.createWithDefault(Nil)
718718

719+
private[spark] val YARN_KERBEROS_FILESYSTEM_RENEWAL_EXCLUDE =
720+
ConfigBuilder("spark.yarn.kerberos.renewal.excludeHadoopFileSystems")
721+
.doc("The list of Hadoop filesystem URLs whose hosts will be excluded from " +
722+
"delegation token renewal at resource scheduler. Currently this is known to " +
723+
"work under YARN, so YARN Resource Manager won't renew tokens for the application. " +
724+
"Note that as resource scheduler does not renew token, so any application running " +
725+
"longer than the original token expiration that tries to use that token will likely fail.")
726+
.version("3.2.0")
727+
.stringConf
728+
.toSequence
729+
.createWithDefault(Nil)
730+
719731
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
720732
.version("1.0.0")
721733
.intConf

core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection.mutable
2626
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
2727
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
2828
import org.apache.avro.{Schema, SchemaNormalization}
29-
import org.apache.avro.generic.{GenericData, GenericRecord}
29+
import org.apache.avro.generic.{GenericContainer, GenericData}
3030
import org.apache.avro.io._
3131
import org.apache.commons.io.IOUtils
3232

@@ -35,17 +35,18 @@ import org.apache.spark.io.CompressionCodec
3535
import org.apache.spark.util.Utils
3636

3737
/**
38-
* Custom serializer used for generic Avro records. If the user registers the schemas
38+
* Custom serializer used for generic Avro containers. If the user registers the schemas
3939
* ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
4040
* schema, as to reduce network IO.
4141
* Actions like parsing or compressing schemas are computationally expensive so the serializer
4242
* caches all previously seen values as to reduce the amount of work needed to do.
4343
* @param schemas a map where the keys are unique IDs for Avro schemas and the values are the
4444
* string representation of the Avro schema, used to decrease the amount of data
4545
* that needs to be serialized.
46+
* @tparam D the subtype of [[GenericContainer]] handled by this serializer
4647
*/
47-
private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
48-
extends KSerializer[GenericRecord] {
48+
private[serializer] class GenericAvroSerializer[D <: GenericContainer]
49+
(schemas: Map[Long, String]) extends KSerializer[D] {
4950

5051
/** Used to reduce the amount of effort to compress the schema */
5152
private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
@@ -100,10 +101,10 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
100101
})
101102

102103
/**
103-
* Serializes a record to the given output stream. It caches a lot of the internal data as
104-
* to not redo work
104+
* Serializes a generic container to the given output stream. It caches a lot of the internal
105+
* data as to not redo work
105106
*/
106-
def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = {
107+
def serializeDatum(datum: D, output: KryoOutput): Unit = {
107108
val encoder = EncoderFactory.get.binaryEncoder(output, null)
108109
val schema = datum.getSchema
109110
val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
@@ -121,16 +122,16 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
121122
}
122123

123124
writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
124-
.asInstanceOf[DatumWriter[R]]
125+
.asInstanceOf[DatumWriter[D]]
125126
.write(datum, encoder)
126127
encoder.flush()
127128
}
128129

129130
/**
130-
* Deserializes generic records into their in-memory form. There is internal
131+
* Deserializes generic containers into their in-memory form. There is internal
131132
* state to keep a cache of already seen schemas and datum readers.
132133
*/
133-
def deserializeDatum(input: KryoInput): GenericRecord = {
134+
def deserializeDatum(input: KryoInput): D = {
134135
val schema = {
135136
if (input.readBoolean()) {
136137
val fingerprint = input.readLong()
@@ -151,13 +152,13 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
151152
}
152153
val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
153154
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
154-
.asInstanceOf[DatumReader[GenericRecord]]
155-
.read(null, decoder)
155+
.asInstanceOf[DatumReader[D]]
156+
.read(null.asInstanceOf[D], decoder)
156157
}
157158

158-
override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit =
159+
override def write(kryo: Kryo, output: KryoOutput, datum: D): Unit =
159160
serializeDatum(datum, output)
160161

161-
override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord =
162+
override def read(kryo: Kryo, input: KryoInput, datumClass: Class[D]): D =
162163
deserializeDatum(input)
163164
}

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutpu
3333
import com.esotericsoftware.kryo.pool.{KryoCallback, KryoFactory, KryoPool}
3434
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
3535
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
36-
import org.apache.avro.generic.{GenericData, GenericRecord}
36+
import org.apache.avro.generic.{GenericContainer, GenericData, GenericRecord}
3737
import org.roaringbitmap.RoaringBitmap
3838

3939
import org.apache.spark._
@@ -153,8 +153,18 @@ class KryoSerializer(conf: SparkConf)
153153
kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
154154
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
155155

156-
kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
157-
kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
156+
// Register serializers for Avro GenericContainer classes
157+
// We do not handle SpecificRecordBase and SpecificFixed here. They are abstract classes and
158+
// we will need to register serializers for their concrete implementations individually.
159+
// Also, their serialization requires the use of SpecificDatum(Reader|Writer) instead of
160+
// GenericDatum(Reader|Writer).
161+
def registerAvro[T <: GenericContainer]()(implicit ct: ClassTag[T]): Unit =
162+
kryo.register(ct.runtimeClass, new GenericAvroSerializer[T](avroSchemas))
163+
registerAvro[GenericRecord]
164+
registerAvro[GenericData.Record]
165+
registerAvro[GenericData.Array[_]]
166+
registerAvro[GenericData.EnumSymbol]
167+
registerAvro[GenericData.Fixed]
158168

159169
// Use the default classloader when calling the user registrator.
160170
Utils.withContextClassLoader(classLoader) {

core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,17 @@ private[spark] class BlockStoreShuffleReader[K, C](
5151
true
5252
}
5353
val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)
54+
// SPARK-34790: Fetching continuous blocks in batch is incompatible with io encryption.
55+
val ioEncryption = conf.get(config.IO_ENCRYPTION_ENABLED)
5456

5557
val doBatchFetch = shouldBatchFetch && serializerRelocatable &&
56-
(!compressed || codecConcatenation) && !useOldFetchProtocol
58+
(!compressed || codecConcatenation) && !useOldFetchProtocol && !ioEncryption
5759
if (shouldBatchFetch && !doBatchFetch) {
5860
logDebug("The feature tag of continuous shuffle block fetching is set to true, but " +
5961
"we can not enable the feature because other conditions are not satisfied. " +
6062
s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " +
6163
s"codec concatenation: $codecConcatenation, use old shuffle fetch protocol: " +
62-
s"$useOldFetchProtocol.")
64+
s"$useOldFetchProtocol, io encryption: $ioEncryption.")
6365
}
6466
doBatchFetch
6567
}

0 commit comments

Comments
 (0)