@@ -32,6 +32,21 @@ private[spark] case class AccumulatorMetadata(
3232 countFailedValues : Boolean ) extends Serializable
3333
3434
35+ /**
36+ * The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of
37+ * type `OUT`. Implementations must define following methods:
38+ * - isZero: tell if this accumulator is zero value or not. e.g. for a counter accumulator,
39+ * 0 is zero value; for a list accumulator, Nil is zero value.
40+ * - copyAndReset: create a new copy of this accumulator, which is zero value. i.e. call `isZero`
41+ * on the copy must return true.
42+ * - add: defines how to accumulate the inputs. e.g. it can be a simple `+=` for counter
43+ * accumulator
44+ * - merge: defines how to merge another accumulator of same type.
45+ * - localValue: defines how to produce the output by the current state of this accumulator.
46+ *
47+ * The implementations decide how to store intermediate values, e.g. a long field for a counter
48+ * accumulator, a double and a long field for a average accumulator(storing the sum and count).
49+ */
3550abstract class NewAccumulator [IN , OUT ] extends Serializable {
3651 private [spark] var metadata : AccumulatorMetadata = _
3752 private [this ] var atDriverSide = true
@@ -57,17 +72,17 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
5772 }
5873 }
5974
60- def id : Long = {
75+ final def id : Long = {
6176 assertMetadataNotNull()
6277 metadata.id
6378 }
6479
65- def name : Option [String ] = {
80+ final def name : Option [String ] = {
6681 assertMetadataNotNull()
6782 metadata.name
6883 }
6984
70- def countFailedValues : Boolean = {
85+ final def countFailedValues : Boolean = {
7186 assertMetadataNotNull()
7287 metadata.countFailedValues
7388 }
@@ -79,10 +94,10 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
7994
8095 final private [spark] def isAtDriverSide : Boolean = atDriverSide
8196
82- def copyAndReset (): NewAccumulator [IN , OUT ]
83-
8497 def isZero (): Boolean
8598
99+ def copyAndReset (): NewAccumulator [IN , OUT ]
100+
86101 def add (v : IN ): Unit
87102
88103 def += (v : IN ): Unit = add(v)
@@ -100,7 +115,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
100115 def localValue : OUT
101116
102117 // Called by Java when serializing an object
103- protected def writeReplace (): Any = {
118+ final protected def writeReplace (): Any = {
104119 if (atDriverSide) {
105120 if (! isRegistered) {
106121 throw new UnsupportedOperationException (
@@ -212,10 +227,10 @@ private[spark] object AccumulatorContext {
212227class LongAccumulator extends NewAccumulator [jl.Long , jl.Long ] {
213228 private [this ] var _sum = 0L
214229
215- override def copyAndReset (): LongAccumulator = new LongAccumulator
216-
217230 override def isZero (): Boolean = _sum == 0
218231
232+ override def copyAndReset (): LongAccumulator = new LongAccumulator
233+
219234 override def add (v : jl.Long ): Unit = _sum += v
220235
221236 def add (v : Long ): Unit = _sum += v
@@ -237,10 +252,10 @@ class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] {
237252class DoubleAccumulator extends NewAccumulator [jl.Double , jl.Double ] {
238253 private [this ] var _sum = 0.0
239254
240- override def copyAndReset (): DoubleAccumulator = new DoubleAccumulator
241-
242255 override def isZero (): Boolean = _sum == 0.0
243256
257+ override def copyAndReset (): DoubleAccumulator = new DoubleAccumulator
258+
244259 override def add (v : jl.Double ): Unit = _sum += v
245260
246261 def add (v : Double ): Unit = _sum += v
@@ -263,10 +278,10 @@ class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] {
263278 private [this ] var _sum = 0.0
264279 private [this ] var _count = 0L
265280
266- override def copyAndReset (): AverageAccumulator = new AverageAccumulator
267-
268281 override def isZero (): Boolean = _sum == 0.0 && _count == 0
269282
283+ override def copyAndReset (): AverageAccumulator = new AverageAccumulator
284+
270285 override def add (v : jl.Double ): Unit = {
271286 _sum += v
272287 _count += 1
@@ -297,17 +312,17 @@ class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] {
297312}
298313
299314
300- class CollectionAccumulator [T ] extends NewAccumulator [T , java.util.List [T ]] {
315+ class ListAccumulator [T ] extends NewAccumulator [T , java.util.List [T ]] {
301316 private [this ] val _list : java.util.List [T ] = new java.util.ArrayList [T ]
302317
303- override def copyAndReset (): CollectionAccumulator [T ] = new CollectionAccumulator
304-
305318 override def isZero (): Boolean = _list.isEmpty
306319
320+ override def copyAndReset (): ListAccumulator [T ] = new ListAccumulator
321+
307322 override def add (v : T ): Unit = _list.add(v)
308323
309324 override def merge (other : NewAccumulator [T , java.util.List [T ]]): Unit = other match {
310- case o : CollectionAccumulator [T ] => _list.addAll(o.localValue)
325+ case o : ListAccumulator [T ] => _list.addAll(o.localValue)
311326 case _ => throw new UnsupportedOperationException (
312327 s " Cannot merge ${this .getClass.getName} with ${other.getClass.getName}" )
313328 }
@@ -321,14 +336,14 @@ class LegacyAccumulatorWrapper[R, T](
321336 param : org.apache.spark.AccumulableParam [R , T ]) extends NewAccumulator [T , R ] {
322337 private [spark] var _value = initialValue // Current value on driver
323338
339+ override def isZero (): Boolean = _value == param.zero(initialValue)
340+
324341 override def copyAndReset (): LegacyAccumulatorWrapper [R , T ] = {
325342 val acc = new LegacyAccumulatorWrapper (initialValue, param)
326343 acc._value = param.zero(initialValue)
327344 acc
328345 }
329346
330- override def isZero (): Boolean = _value == param.zero(initialValue)
331-
332347 override def add (v : T ): Unit = _value = param.addAccumulator(_value, v)
333348
334349 override def merge (other : NewAccumulator [T , R ]): Unit = other match {
0 commit comments