Skip to content

Commit be8ff0e

Browse files
committed
add doc
1 parent 4fd08a4 commit be8ff0e

File tree

3 files changed

+109
-24
lines changed

3 files changed

+109
-24
lines changed

core/src/main/scala/org/apache/spark/NewAccumulator.scala

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,21 @@ private[spark] case class AccumulatorMetadata(
3232
countFailedValues: Boolean) extends Serializable
3333

3434

35+
/**
36+
* The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of
37+
* type `OUT`. Implementations must define following methods:
38+
* - isZero: tell if this accumulator is zero value or not. e.g. for a counter accumulator,
39+
* 0 is zero value; for a list accumulator, Nil is zero value.
40+
* - copyAndReset: create a new copy of this accumulator, which is zero value. i.e. call `isZero`
41+
* on the copy must return true.
42+
* - add: defines how to accumulate the inputs. e.g. it can be a simple `+=` for counter
43+
* accumulator
44+
* - merge: defines how to merge another accumulator of same type.
45+
* - localValue: defines how to produce the output by the current state of this accumulator.
46+
*
47+
* The implementations decide how to store intermediate values, e.g. a long field for a counter
48+
* accumulator, a double and a long field for a average accumulator(storing the sum and count).
49+
*/
3550
abstract class NewAccumulator[IN, OUT] extends Serializable {
3651
private[spark] var metadata: AccumulatorMetadata = _
3752
private[this] var atDriverSide = true
@@ -57,17 +72,17 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
5772
}
5873
}
5974

60-
def id: Long = {
75+
final def id: Long = {
6176
assertMetadataNotNull()
6277
metadata.id
6378
}
6479

65-
def name: Option[String] = {
80+
final def name: Option[String] = {
6681
assertMetadataNotNull()
6782
metadata.name
6883
}
6984

70-
def countFailedValues: Boolean = {
85+
final def countFailedValues: Boolean = {
7186
assertMetadataNotNull()
7287
metadata.countFailedValues
7388
}
@@ -79,10 +94,10 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
7994

8095
final private[spark] def isAtDriverSide: Boolean = atDriverSide
8196

82-
def copyAndReset(): NewAccumulator[IN, OUT]
83-
8497
def isZero(): Boolean
8598

99+
def copyAndReset(): NewAccumulator[IN, OUT]
100+
86101
def add(v: IN): Unit
87102

88103
def +=(v: IN): Unit = add(v)
@@ -100,7 +115,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
100115
def localValue: OUT
101116

102117
// Called by Java when serializing an object
103-
protected def writeReplace(): Any = {
118+
final protected def writeReplace(): Any = {
104119
if (atDriverSide) {
105120
if (!isRegistered) {
106121
throw new UnsupportedOperationException(
@@ -212,10 +227,10 @@ private[spark] object AccumulatorContext {
212227
class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] {
213228
private[this] var _sum = 0L
214229

215-
override def copyAndReset(): LongAccumulator = new LongAccumulator
216-
217230
override def isZero(): Boolean = _sum == 0
218231

232+
override def copyAndReset(): LongAccumulator = new LongAccumulator
233+
219234
override def add(v: jl.Long): Unit = _sum += v
220235

221236
def add(v: Long): Unit = _sum += v
@@ -237,10 +252,10 @@ class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] {
237252
class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] {
238253
private[this] var _sum = 0.0
239254

240-
override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator
241-
242255
override def isZero(): Boolean = _sum == 0.0
243256

257+
override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator
258+
244259
override def add(v: jl.Double): Unit = _sum += v
245260

246261
def add(v: Double): Unit = _sum += v
@@ -263,10 +278,10 @@ class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] {
263278
private[this] var _sum = 0.0
264279
private[this] var _count = 0L
265280

266-
override def copyAndReset(): AverageAccumulator = new AverageAccumulator
267-
268281
override def isZero(): Boolean = _sum == 0.0 && _count == 0
269282

283+
override def copyAndReset(): AverageAccumulator = new AverageAccumulator
284+
270285
override def add(v: jl.Double): Unit = {
271286
_sum += v
272287
_count += 1
@@ -297,17 +312,17 @@ class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] {
297312
}
298313

299314

300-
class CollectionAccumulator[T] extends NewAccumulator[T, java.util.List[T]] {
315+
class ListAccumulator[T] extends NewAccumulator[T, java.util.List[T]] {
301316
private[this] val _list: java.util.List[T] = new java.util.ArrayList[T]
302317

303-
override def copyAndReset(): CollectionAccumulator[T] = new CollectionAccumulator
304-
305318
override def isZero(): Boolean = _list.isEmpty
306319

320+
override def copyAndReset(): ListAccumulator[T] = new ListAccumulator
321+
307322
override def add(v: T): Unit = _list.add(v)
308323

309324
override def merge(other: NewAccumulator[T, java.util.List[T]]): Unit = other match {
310-
case o: CollectionAccumulator[T] => _list.addAll(o.localValue)
325+
case o: ListAccumulator[T] => _list.addAll(o.localValue)
311326
case _ => throw new UnsupportedOperationException(
312327
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
313328
}
@@ -321,14 +336,14 @@ class LegacyAccumulatorWrapper[R, T](
321336
param: org.apache.spark.AccumulableParam[R, T]) extends NewAccumulator[T, R] {
322337
private[spark] var _value = initialValue // Current value on driver
323338

339+
override def isZero(): Boolean = _value == param.zero(initialValue)
340+
324341
override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = {
325342
val acc = new LegacyAccumulatorWrapper(initialValue, param)
326343
acc._value = param.zero(initialValue)
327344
acc
328345
}
329346

330-
override def isZero(): Boolean = _value == param.zero(initialValue)
331-
332347
override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
333348

334349
override def merge(other: NewAccumulator[T, R]): Unit = other match {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,27 +1276,97 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
12761276
acc
12771277
}
12781278

1279+
/**
1280+
* Register the given accumulator. Note that accumulators must be registered before use, or it
1281+
* will throw exception.
1282+
*/
1283+
def register(acc: NewAccumulator[_, _]): Unit = {
1284+
acc.register(this)
1285+
}
1286+
1287+
/**
1288+
* Register the given accumulator with given name. Note that accumulators must be registered
1289+
* before use, or it will throw exception.
1290+
*/
1291+
def register(acc: NewAccumulator[_, _], name: String): Unit = {
1292+
acc.register(this, name = Some(name))
1293+
}
1294+
1295+
/**
1296+
* Create and register a long accumulator, which starts with 0 and accumulates inputs by `+=`.
1297+
*/
12791298
def longAccumulator: LongAccumulator = {
12801299
val acc = new LongAccumulator
1281-
acc.register(this)
1300+
register(acc)
12821301
acc
12831302
}
12841303

1304+
/**
1305+
* Create and register a long accumulator, which starts with 0 and accumulates inputs by `+=`.
1306+
*/
12851307
def longAccumulator(name: String): LongAccumulator = {
12861308
val acc = new LongAccumulator
1287-
acc.register(this, name = Some(name))
1309+
register(acc, name)
12881310
acc
12891311
}
12901312

1313+
/**
1314+
* Create and register a double accumulator, which starts with 0 and accumulates inputs by `+=`.
1315+
*/
12911316
def doubleAccumulator: DoubleAccumulator = {
12921317
val acc = new DoubleAccumulator
1293-
acc.register(this)
1318+
register(acc)
12941319
acc
12951320
}
12961321

1322+
/**
1323+
* Create and register a double accumulator, which starts with 0 and accumulates inputs by `+=`.
1324+
*/
12971325
def doubleAccumulator(name: String): DoubleAccumulator = {
12981326
val acc = new DoubleAccumulator
1299-
acc.register(this, name = Some(name))
1327+
register(acc, name)
1328+
acc
1329+
}
1330+
1331+
/**
1332+
* Create and register an average accumulator, which accumulates double inputs by recording the
1333+
* total sum and total count, and produce the output by sum / total. Note that Double.NaN will be
1334+
* returned if no input is added.
1335+
*/
1336+
def averageAccumulator: AverageAccumulator = {
1337+
val acc = new AverageAccumulator
1338+
register(acc)
1339+
acc
1340+
}
1341+
1342+
/**
1343+
* Create and register an average accumulator, which accumulates double inputs by recording the
1344+
* total sum and total count, and produce the output by sum / total. Note that Double.NaN will be
1345+
* returned if no input is added.
1346+
*/
1347+
def averageAccumulator(name: String): AverageAccumulator = {
1348+
val acc = new AverageAccumulator
1349+
register(acc, name)
1350+
acc
1351+
}
1352+
1353+
/**
1354+
* Create and register a list accumulator, which starts with empty list and accumulates inputs
1355+
* by adding them into the inner list.
1356+
*/
1357+
def listAccumulator[T]: ListAccumulator[T] = {
1358+
val acc = new ListAccumulator[T]
1359+
register(acc)
1360+
acc
1361+
}
1362+
1363+
/**
1364+
* Create and register a list accumulator, which starts with empty list and accumulates inputs
1365+
* by adding them into the inner list.
1366+
*/
1367+
def listAccumulator[T](name: String): ListAccumulator[T] = {
1368+
val acc = new ListAccumulator[T]
1369+
register(acc, name)
13001370
acc
13011371
}
13021372

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,10 @@ private[spark] class BlockStatusesAccumulator
292292
extends NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] {
293293
private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)]
294294

295-
override def copyAndReset(): BlockStatusesAccumulator = new BlockStatusesAccumulator
296-
297295
override def isZero(): Boolean = _seq.isEmpty
298296

297+
override def copyAndReset(): BlockStatusesAccumulator = new BlockStatusesAccumulator
298+
299299
override def add(v: (BlockId, BlockStatus)): Unit = _seq += v
300300

301301
override def merge(other: NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]])

0 commit comments

Comments
 (0)