Skip to content

Commit c0ef0c2

Browse files
committed
SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus instead of HyperLogLog.
1 parent 3b0baba commit c0ef0c2

File tree

6 files changed

+42
-71
lines changed

6 files changed

+42
-71
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.collection.mutable
2828
import scala.collection.mutable.ArrayBuffer
2929
import scala.reflect.ClassTag
3030

31-
import com.clearspring.analytics.stream.cardinality.HyperLogLog
31+
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
3232
import org.apache.hadoop.conf.{Configurable, Configuration}
3333
import org.apache.hadoop.fs.FileSystem
3434
import org.apache.hadoop.io.SequenceFile.CompressionType
@@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner
4646
import org.apache.spark.SparkContext._
4747
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4848
import org.apache.spark.serializer.Serializer
49-
import org.apache.spark.util.SerializableHyperLogLog
5049

5150
/**
5251
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -218,14 +217,29 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
218217
* The accuracy of approximation can be controlled through the relative standard deviation
219218
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
220219
* more accurate counts but increase the memory footprint and vice versa. Uses the provided
221-
* Partitioner to partition the output RDD.
220+
* [[Partitioner]] to partition the output RDD.
221+
*
222+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
223+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
224+
* [[http://research.google.com/pubs/pub40671.html]].
222225
*/
223226
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
224-
val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
225-
val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
226-
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
227+
val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
228+
val createHLL = (v: V) => {
229+
val hll = new HyperLogLogPlus(precision)
230+
hll.offer(v)
231+
hll
232+
}
233+
val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
234+
hll.offer(v)
235+
hll
236+
}
237+
val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
238+
h1.addAll(h2)
239+
h1
240+
}
227241

228-
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
242+
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
229243
}
230244

231245
/**

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.mutable
2424
import scala.collection.mutable.ArrayBuffer
2525
import scala.reflect.{classTag, ClassTag}
2626

27-
import com.clearspring.analytics.stream.cardinality.HyperLogLog
27+
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
2828
import org.apache.hadoop.io.BytesWritable
2929
import org.apache.hadoop.io.compress.CompressionCodec
3030
import org.apache.hadoop.io.NullWritable
@@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator
4141
import org.apache.spark.partial.GroupedCountEvaluator
4242
import org.apache.spark.partial.PartialResult
4343
import org.apache.spark.storage.StorageLevel
44-
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
44+
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
4545
import org.apache.spark.util.collection.OpenHashMap
4646
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}
4747

@@ -925,11 +925,24 @@ abstract class RDD[T: ClassTag](
925925
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
926926
* more accurate counts but increase the memory footprint and vise versa. The default value of
927927
* relativeSD is 0.05.
928+
*
929+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
930+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
931+
* [[http://research.google.com/pubs/pub40671.html]].
928932
*/
929933
@Experimental
930934
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
931-
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
932-
aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
935+
val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
936+
val zeroCounter = new HyperLogLogPlus(precision)
937+
aggregate(zeroCounter)(
938+
(hll: HyperLogLogPlus, v: T) => {
939+
hll.offer(v)
940+
hll
941+
},
942+
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
943+
h1.addAll(h2)
944+
h2
945+
}).cardinality()
933946
}
934947

935948
/**

core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala

Lines changed: 0 additions & 52 deletions
This file was deleted.

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
126126
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
127127
val rdd1 = sc.parallelize(stacked)
128128
val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
129-
counted1.foreach{
130-
case(k, count) => assert(error(count, k) < relativeSD)
131-
}
129+
counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) }
132130

133131
val rnd = new Random()
134132

@@ -139,9 +137,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
139137
}
140138
val rdd2 = sc.parallelize(randStacked)
141139
val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
142-
counted2.foreach{
143-
case(k, count) => assert(error(count, k) < relativeSD)
144-
}
140+
counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) }
145141
}
146142

147143
test("join") {

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@
284284
<dependency>
285285
<groupId>com.clearspring.analytics</groupId>
286286
<artifactId>stream</artifactId>
287-
<version>2.5.1</version>
287+
<version>2.7.0</version>
288288
<exclusions>
289289
<!-- Only HyperLogLog is used, which doesn't depend on fastutil -->
290290
<exclusion>

project/SparkBuild.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ object SparkBuild extends Build {
358358
"com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm),
359359
"com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
360360
"org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
361-
"com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil),
361+
"com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil),
362362
"org.spark-project" % "pyrolite" % "2.0.1",
363363
"net.sf.py4j" % "py4j" % "0.8.1"
364364
),

0 commit comments

Comments
 (0)