Skip to content

Commit ba55d20

Browse files
committed
Add explanatory comments
1 parent 3f1da96 commit ba55d20

File tree

2 files changed

+28
-5
lines changed

2 files changed

+28
-5
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.serializer
1919

2020
import java.io.{EOFException, IOException, InputStream, OutputStream}
2121
import java.nio.ByteBuffer
22+
import javax.annotation.Nullable
2223

2324
import scala.reflect.ClassTag
2425

@@ -202,26 +203,43 @@ class KryoDeserializationStream(
202203

203204
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
204205

205-
private[this] var cachedKryo: Kryo = ks.newKryo()
206+
/**
207+
* A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do
208+
* their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching
209+
* pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are
210+
* not synchronized.
211+
*/
212+
@Nullable private[this] var cachedKryo: Kryo = null
206213

207-
private[spark] def borrowKryo(): Kryo = {
214+
/**
215+
* Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance;
216+
* otherwise, it allocates a new instance.
217+
*/
218+
private[serializer] def borrowKryo(): Kryo = {
208219
if (cachedKryo != null) {
209220
val kryo = cachedKryo
210-
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
221+
// As a defensive measure, call reset() to clear any Kryo state that might have been modified
222+
// by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue)
223+
kryo.reset()
211224
cachedKryo = null
212225
kryo
213226
} else {
214227
ks.newKryo()
215228
}
216229
}
217230

218-
private[spark] def releaseKryo(kryo: Kryo): Unit = {
231+
/**
232+
* Release a borrowed [[Kryo]] instance. If this serializer instance already has a cached Kryo
233+
* instance, then the given Kryo instance is discarded; otherwise, the Kryo is stored for later
234+
* re-use.
235+
*/
236+
private[serializer] def releaseKryo(kryo: Kryo): Unit = {
219237
if (cachedKryo == null) {
220238
cachedKryo = kryo
221239
}
222240
}
223241

224-
// Make these lazy vals to avoid creating a buffer unless we use them
242+
// Make these lazy vals to avoid creating a buffer unless we use them.
225243
private lazy val output = ks.newKryoOutput()
226244
private lazy val input = new KryoInput()
227245

core/src/main/scala/org/apache/spark/serializer/Serializer.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.serializer
1919

2020
import java.io._
2121
import java.nio.ByteBuffer
22+
import javax.annotation.concurrent.NotThreadSafe
2223

2324
import scala.reflect.ClassTag
2425

@@ -114,8 +115,12 @@ object Serializer {
114115
/**
115116
* :: DeveloperApi ::
116117
* An instance of a serializer, for use by one thread at a time.
118+
*
119+
* It is legal to create multiple serialization / deserialization streams from the same
120+
* SerializerInstance as long as those streams are all used within the same thread.
117121
*/
118122
@DeveloperApi
123+
@NotThreadSafe
119124
abstract class SerializerInstance {
120125
def serialize[T: ClassTag](t: T): ByteBuffer
121126

0 commit comments

Comments
 (0)