@@ -63,7 +63,7 @@ class Accumulable[R, T] private (
6363 param : AccumulableParam [R , T ],
6464 name : Option [String ],
6565 countFailedValues : Boolean ) = {
66- this (Accumulators .newId(), initialValue, param, name, countFailedValues)
66+ this (AccumulatorContext .newId(), initialValue, param, name, countFailedValues)
6767 }
6868
6969 private [spark] def this (initialValue : R , param : AccumulableParam [R , T ], name : Option [String ]) = {
@@ -72,61 +72,43 @@ class Accumulable[R, T] private (
7272
7373 def this (initialValue : R , param : AccumulableParam [R , T ]) = this (initialValue, param, None )
7474
75- @ volatile @ transient private var value_ : R = initialValue // Current value on driver
76- val zero = param.zero(initialValue) // Zero value to be passed to executors
77- private var deserialized = false
78-
79- Accumulators .register(this )
80-
81- /**
82- * Return a copy of this [[Accumulable ]].
83- *
84- * The copy will have the same ID as the original and will not be registered with
85- * [[Accumulators ]] again. This method exists so that the caller can avoid passing the
86- * same mutable instance around.
87- */
88- private [spark] def copy (): Accumulable [R , T ] = {
89- new Accumulable [R , T ](id, initialValue, param, name, countFailedValues)
90- }
75+ private [spark] val newAcc = new LegacyAccumulatorWrapper (initialValue, param)
76+ newAcc.metadata = AccumulatorMetadata (id, name, countFailedValues)
77+ // Register the new accumulator in ctor, to follow the previous behaviour.
78+ AccumulatorContext .register(newAcc)
9179
9280 /**
9381 * Add more data to this accumulator / accumulable
9482 * @param term the data to add
9583 */
96- def += (term : T ) { value_ = param.addAccumulator(value_, term) }
84+ def += (term : T ) { newAcc.add( term) }
9785
9886 /**
9987 * Add more data to this accumulator / accumulable
10088 * @param term the data to add
10189 */
102- def add (term : T ) { value_ = param.addAccumulator(value_, term) }
90+ def add (term : T ) { newAcc.add( term) }
10391
10492 /**
10593 * Merge two accumulable objects together
10694 *
10795 * Normally, a user will not want to use this version, but will instead call `+=`.
10896 * @param term the other `R` that will get merged with this
10997 */
110- def ++= (term : R ) { value_ = param.addInPlace(value_ , term)}
98+ def ++= (term : R ) { newAcc._value = param.addInPlace(newAcc._value , term) }
11199
112100 /**
113101 * Merge two accumulable objects together
114102 *
115103 * Normally, a user will not want to use this version, but will instead call `add`.
116104 * @param term the other `R` that will get merged with this
117105 */
118- def merge (term : R ) { value_ = param.addInPlace(value_ , term)}
106+ def merge (term : R ) { newAcc._value = param.addInPlace(newAcc._value , term) }
119107
120108 /**
121109 * Access the accumulator's current value; only allowed on driver.
122110 */
123- def value : R = {
124- if (! deserialized) {
125- value_
126- } else {
127- throw new UnsupportedOperationException (" Can't read accumulator value in task" )
128- }
129- }
111+ def value : R = newAcc.value
130112
131113 /**
132114 * Get the current value of this accumulator from within a task.
@@ -137,14 +119,14 @@ class Accumulable[R, T] private (
137119 * The typical use of this method is to directly mutate the local value, eg., to add
138120 * an element to a Set.
139121 */
140- def localValue : R = value_
122+ def localValue : R = newAcc.localValue
141123
142124 /**
143125 * Set the accumulator's value; only allowed on driver.
144126 */
145127 def value_= (newValue : R ) {
146- if (! deserialized ) {
147- value_ = newValue
128+ if (newAcc.isAtDriverSide ) {
129+ newAcc._value = newValue
148130 } else {
149131 throw new UnsupportedOperationException (" Can't assign accumulator value in task" )
150132 }
@@ -153,7 +135,7 @@ class Accumulable[R, T] private (
153135 /**
154136 * Set the accumulator's value. For internal use only.
155137 */
156- def setValue (newValue : R ): Unit = { value_ = newValue }
138+ def setValue (newValue : R ): Unit = { newAcc._value = newValue }
157139
158140 /**
159141 * Set the accumulator's value. For internal use only.
@@ -168,22 +150,7 @@ class Accumulable[R, T] private (
168150 new AccumulableInfo (id, name, update, value, isInternal, countFailedValues)
169151 }
170152
171- // Called by Java when deserializing an object
172- private def readObject (in : ObjectInputStream ): Unit = Utils .tryOrIOException {
173- in.defaultReadObject()
174- value_ = zero
175- deserialized = true
176-
177- // Automatically register the accumulator when it is deserialized with the task closure.
178- // This is for external accumulators and internal ones that do not represent task level
179- // metrics, e.g. internal SQL metrics, which are per-operator.
180- val taskContext = TaskContext .get()
181- if (taskContext != null ) {
182- taskContext.registerAccumulator(this )
183- }
184- }
185-
186- override def toString : String = if (value_ == null ) " null" else value_.toString
153+ override def toString : String = if (newAcc._value == null ) " null" else newAcc._value.toString
187154}
188155
189156
0 commit comments