Skip to content

Commit fcf005e

Browse files
committed
Merge pull request #194 from stephanh/more-aggregators
Added Aggregators for BloomFilter, CountMinSketch, MapSketch, HyperLogLog, Moments, Min, Max
2 parents cd499b5 + 8452bd6 commit fcf005e

12 files changed

+217
-4
lines changed

algebird-core/src/main/scala/com/twitter/algebird/BloomFilter.scala

+10
Original file line numberDiff line numberDiff line change
@@ -338,3 +338,13 @@ case class BFHash(numHashes: Int, width: Int, seed: Long = 0L) extends Function1
338338
}
339339
}
340340

341+
case class BloomFilterAggregator(bfMonoid: BloomFilterMonoid) extends MonoidAggregator[String, BF, BF] {
342+
val monoid = bfMonoid
343+
344+
def prepare(value: String) = monoid.create(value)
345+
def present(bf: BF) = bf
346+
}
347+
348+
object BloomFilterAggregator {
349+
def apply(numHashes: Int, width: Int, seed: Int = 0): BloomFilterAggregator = BloomFilterAggregator(BloomFilterMonoid(numHashes, width, seed))
350+
}

algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala

+23-2
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,22 @@ class CountMinSketchMonoid(eps : Double, delta : Double, seed : Int,
111111
}
112112

113113
object CMS {
114-
def monoid(eps : Double, delta : Double, seed : Int, heavyHittersPct : Double = 0.01) =
114+
def monoid(eps : Double, delta : Double, seed : Int, heavyHittersPct : Double = 0.01): CountMinSketchMonoid =
115115
new CountMinSketchMonoid(eps, delta, seed, heavyHittersPct)
116116

117-
def monoid(depth : Int, width : Int, seed : Int, heavyHittersPct : Double) =
117+
def monoid(depth : Int, width : Int, seed : Int, heavyHittersPct : Double): CountMinSketchMonoid =
118118
new CountMinSketchMonoid(CMS.eps(width), CMS.delta(depth), seed, heavyHittersPct)
119119

120+
def aggregator(eps: Double, delta: Double, seed: Int, heavyHittersPct: Double = 0.01): CountMinSketchAggregator = {
121+
val monoid = new CountMinSketchMonoid(eps, delta, seed, heavyHittersPct)
122+
new CountMinSketchAggregator(monoid)
123+
}
124+
125+
def aggregator(depth : Int, width : Int, seed : Int, heavyHittersPct : Double): CountMinSketchAggregator = {
126+
val monoid = new CountMinSketchMonoid(CMS.eps(width), CMS.delta(depth), seed, heavyHittersPct)
127+
new CountMinSketchAggregator(monoid)
128+
}
129+
120130
/**
121131
* Functions to translate between (eps, delta) and (depth, width). The translation is:
122132
* depth = ceil(ln 1/delta)
@@ -453,3 +463,14 @@ case class HeavyHitter(item : Long, count : Long)
453463
object HeavyHitter {
454464
val ordering = Ordering.by { hh : HeavyHitter => (hh.count, hh.item) }
455465
}
466+
467+
/**
468+
* An Aggregator for the CountMinSketch.
469+
* Can be created using CMS.aggregator
470+
*/
471+
case class CountMinSketchAggregator(cmsMonoid : CountMinSketchMonoid) extends MonoidAggregator[Long, CMS, CMS] {
472+
val monoid = cmsMonoid
473+
474+
def prepare(value : Long) = monoid.create(value)
475+
def present(cms : CMS) = cms
476+
}

algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala

+18
Original file line numberDiff line numberDiff line change
@@ -358,3 +358,21 @@ class HyperLogLogMonoid(val bits : Int) extends Monoid[HLL] {
358358
.getOrElse(Approximate.exact(0L)) //Empty lists have no intersection
359359
}
360360
}
361+
362+
object HyperLogLogAggregator {
363+
def apply(bits: Int): HyperLogLogAggregator = {
364+
val monoid = new HyperLogLogMonoid(bits)
365+
new HyperLogLogAggregator(monoid)
366+
}
367+
368+
def sizeAggregator(bits: Int): Aggregator[Array[Byte], HLL, Double] = {
369+
apply(bits).andThenPresent(_.estimatedSize)
370+
}
371+
}
372+
373+
case class HyperLogLogAggregator(val hllMonoid: HyperLogLogMonoid) extends MonoidAggregator[Array[Byte], HLL, HLL] {
374+
val monoid = hllMonoid
375+
376+
def prepare(value: Array[Byte]) = monoid.create(value)
377+
def present(hll: HLL) = hll
378+
}

algebird-core/src/main/scala/com/twitter/algebird/MomentsGroup.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ case class Moments(m0 : Long, m1 : Double, m2 : Double, m3 : Double, m4 : Double
4141

4242
object Moments {
4343
implicit val group = MomentsGroup
44-
44+
implicit val aggregator = MomentsAggregator
45+
4546
// Create a Moments object given a single value. This is useful for
4647
// initializing moment calculations at the start of a stream.
4748
def apply[V <% Double](value : V) = new Moments(1L, value, 0, 0, 0)
@@ -126,3 +127,10 @@ object MomentsGroup extends Group[Moments] {
126127
}
127128
}
128129
}
130+
131+
object MomentsAggregator extends MonoidAggregator[Double, Moments, Moments] {
132+
val monoid = MomentsGroup
133+
134+
def prepare(input: Double): Moments = Moments(input)
135+
def present(m: Moments) = m
136+
}

algebird-core/src/main/scala/com/twitter/algebird/OrderedSemigroup.scala

+16
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ object Max {
2828
def monoid[T](zero: => T)(implicit ord: Ordering[T]): Monoid[Max[T]] =
2929
Monoid.from(Max(zero)) { (l,r) => if(ord.gteq(l.get, r.get)) l else r }
3030

31+
def aggregator[T](implicit ord:Ordering[T]): MaxAggregator[T] = MaxAggregator()(ord)
32+
3133
implicit def intMonoid: Monoid[Max[Int]] = monoid(Int.MinValue)
3234
implicit def longMonoid: Monoid[Max[Long]] = monoid(Long.MinValue)
3335
implicit def doubleMonoid: Monoid[Max[Double]] = monoid(Double.MinValue)
@@ -61,6 +63,8 @@ object Min {
6163
def monoid[T](zero: => T)(implicit ord: Ordering[T]): Monoid[Min[T]] =
6264
Monoid.from(Min(zero)) { (l,r) => if(ord.lteq(l.get, r.get)) l else r }
6365

66+
def aggregator[T](implicit ord:Ordering[T]): MinAggregator[T] = MinAggregator()(ord)
67+
6468
implicit def intMonoid: Monoid[Min[Int]] = monoid(Int.MaxValue)
6569
implicit def longMonoid: Monoid[Min[Long]] = monoid(Long.MaxValue)
6670
implicit def doubleMonoid: Monoid[Min[Double]] = monoid(Double.MaxValue)
@@ -78,3 +82,15 @@ case class Last[@specialized(Int,Long,Float,Double) +T](get: T)
7882
object Last {
7983
implicit def semigroup[T] = Semigroup.from[Last[T]] { (l,r) => r }
8084
}
85+
86+
case class MinAggregator[T](implicit ord: Ordering[T]) extends Aggregator[T, T, T] {
87+
def prepare(v: T) = v
88+
def reduce(l: T, r: T) = ord.min(l , r)
89+
def present(v: T) = v
90+
}
91+
92+
case class MaxAggregator[T](implicit ord: Ordering[T]) extends Aggregator[T, T, T] {
93+
def prepare(v: T) = v
94+
def reduce(l: T, r: T) = ord.max(l , r)
95+
def present(v: T) = v
96+
}

algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala

+15
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ object SketchMap {
128128
(implicit serialization: K => Array[Byte], valueOrdering: Ordering[V], monoid: Monoid[V]): SketchMapMonoid[K, V] = {
129129
new SketchMapMonoid(width(eps), depth(delta), seed, heavyHittersCount)(serialization, valueOrdering, monoid)
130130
}
131+
132+
def aggregator[K, V](eps: Double, delta: Double, seed: Int, heavyHittersCount: Int)
133+
(implicit serialization: K => Array[Byte], valueOrdering: Ordering[V], monoid: Monoid[V]): SketchMapAggregator[K, V] = {
134+
SketchMapAggregator(SketchMap.monoid(eps, delta, seed, heavyHittersCount))
135+
}
131136
}
132137

133138
case class SketchMap[K, V](
@@ -225,3 +230,13 @@ case class SketchMap[K, V](
225230
}
226231
}
227232

233+
/**
234+
* An Aggregator for the SketchMap.
235+
* Can be created using SketchMap.aggregator
236+
*/
237+
case class SketchMapAggregator[K, V](skmMonoid : SketchMapMonoid[K, V]) extends MonoidAggregator[(K, V), SketchMap[K, V], SketchMap[K, V]] {
238+
val monoid = skmMonoid
239+
240+
def prepare(value: (K,V)) = monoid.create(value)
241+
def present(skm: SketchMap[K, V]) = skm
242+
}

algebird-test/src/test/scala/com/twitter/algebird/BloomFilterTest.scala

+15
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,20 @@ class BloomFilterTest extends Specification {
8484
size.max must be_>=(size.estimate)
8585
}
8686
}
87+
88+
"work as an Aggregator" in {
89+
(0 to 10).foreach{
90+
_ => {
91+
val aggregator = BloomFilterAggregator(RAND.nextInt(5)+1, RAND.nextInt(64)+32, SEED)
92+
val numEntries = 5
93+
val entries = (0 until numEntries).map(_ => RAND.nextInt.toString)
94+
val bf = aggregator(entries)
95+
96+
entries.foreach{
97+
i => bf.contains(i.toString).isTrue must be_==(true)
98+
}
99+
}
100+
}
101+
}
87102
}
88103
}

algebird-test/src/test/scala/com/twitter/algebird/CountMinSketchTest.scala

+13
Original file line numberDiff line numberDiff line change
@@ -196,5 +196,18 @@ class CountMinSketchTest extends Specification {
196196
cms3.heavyHitters must be_==(Set(5L))
197197
cms4.heavyHitters must be_==(Set[Long]())
198198
}
199+
200+
"work as an Aggregator" in {
201+
val data1 = Seq(1L, 2L, 2L, 3L, 3L, 3L, 4L, 4L, 4L, 4L, 5L, 5L, 5L, 5L, 5L)
202+
val cms1 = CMS.aggregator(EPS, DELTA, SEED, 0.01).apply(data1)
203+
val cms2 = CMS.aggregator(EPS, DELTA, SEED, 0.1).apply(data1)
204+
val cms3 = CMS.aggregator(EPS, DELTA, SEED, 0.3).apply(data1)
205+
val cms4 = CMS.aggregator(EPS, DELTA, SEED, 0.9).apply(data1)
206+
207+
cms1.heavyHitters must be_==(Set(1L, 2L, 3L, 4L, 5L))
208+
cms2.heavyHitters must be_==(Set(2L, 3L, 4L, 5L))
209+
cms3.heavyHitters must be_==(Set(5L))
210+
cms4.heavyHitters must be_==(Set[Long]())
211+
}
199212
}
200213
}

algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogTest.scala

+22
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,28 @@ class HyperLogLogTest extends Specification {
146146
}
147147
}
148148

149+
"work as an Aggregator and return a HLL" in {
150+
List(5, 7, 10).foreach(bits => {
151+
val aggregator = HyperLogLogAggregator(bits)
152+
val data = (0 to 10000).map { i => r.nextInt(1000) }
153+
val exact = exactCount(data).toDouble
154+
155+
val approxCount = aggregator(data.map(int2Bytes(_))).approximateSize.estimate.toDouble
156+
scala.math.abs(exact - approxCount) / exact must be_<(3.5 * aveErrorOf(bits))
157+
})
158+
}
159+
160+
"work as an Aggregator and return size" in {
161+
List(5, 7, 10).foreach(bits => {
162+
val aggregator = HyperLogLogAggregator.sizeAggregator(bits)
163+
val data = (0 to 10000).map { i => r.nextInt(1000) }
164+
val exact = exactCount(data).toDouble
165+
166+
val estimate = aggregator(data.map(int2Bytes(_)))
167+
scala.math.abs(exact - estimate) / exact must be_<(3.5 * aveErrorOf(bits))
168+
})
169+
}
170+
149171
def verifySerialization(h : HLL) {
150172
fromBytes(toBytes(h)) must be_==(h)
151173
fromByteBuffer(java.nio.ByteBuffer.wrap(toBytes(h))) must be_==(h)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.twitter.algebird
2+
3+
import org.specs._
4+
5+
class MinMaxAggregatorTest extends Specification {
6+
val data = List(1, 3, 5, 0, 7, 6)
7+
8+
sealed trait TestElementParent
9+
case object TestElementA extends TestElementParent
10+
case object TestElementB extends TestElementParent
11+
case object TestElementC extends TestElementParent
12+
13+
implicit val testOrdering = Ordering.fromLessThan[TestElementParent]((x, y) => (x, y) match {
14+
case (TestElementA, TestElementA) => false
15+
case (TestElementA, _) => true
16+
case (TestElementB, TestElementB) => false
17+
case (TestElementB, TestElementA) => false
18+
case (TestElementB, TestElementC) => true
19+
case (TestElementC, _) => false
20+
})
21+
22+
val data2 = List(TestElementC, TestElementA, TestElementB)
23+
24+
"MinAggregator" should {
25+
"produce the minimum value" in {
26+
val agg = Min.aggregator[Int]
27+
agg(data) must be_==(0)
28+
29+
val agg2 = Min.aggregator[TestElementParent]
30+
agg2(data2) must be_==(TestElementA)
31+
}
32+
}
33+
34+
"MaxAggregator" should {
35+
"produce the maximum value" in {
36+
val agg = Max.aggregator[Int]
37+
agg(data) must be_==(7)
38+
39+
val agg2 = Max.aggregator[TestElementParent]
40+
agg2(data2) must be_==(TestElementC)
41+
}
42+
}
43+
}

algebird-test/src/test/scala/com/twitter/algebird/MomentsGroupTest.scala

+19-1
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,23 @@ class MomentsGroupTest extends Specification {
6060

6161
val m2 = getMoments(List(1, 1, 1, 2, 3))
6262
testApproxEq(m2.kurtosis, -0.921875)
63-
}
63+
}
64+
65+
"Moments can be aggregated" in {
66+
val m1 = MomentsAggregator(List(1, 2, 3, 4, 5))
67+
testApproxEq(m1.count, 5)
68+
testApproxEq(m1.mean , 3)
69+
testApproxEq(m1.variance, 2)
70+
testApproxEq(m1.skewness, 0)
71+
testApproxEq(m1.kurtosis, -1.3)
72+
73+
val m2 = MomentsAggregator(List(1, 1, 1, 2, 3))
74+
testApproxEq(m2.count, 5)
75+
testApproxEq(m2.mean , 1.6)
76+
testApproxEq(m2.variance, 0.64)
77+
testApproxEq(m2.skewness, 0.84375)
78+
testApproxEq(m2.kurtosis, -0.921875)
79+
80+
81+
}
6482
}

algebird-test/src/test/scala/com/twitter/algebird/SketchMapTest.scala

+14
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,19 @@ class SketchMapTest extends Specification {
130130
val sm4 = monoid.plus(sm3, monoid.create(Seq((100, 100L), (200, 30L), (200, 20L), (200, 10L))))
131131
sm4.heavyHitters must be_==(List((100, 5L), (200, 10L)))
132132
}
133+
134+
"work as an Aggregator" in {
135+
val data = Seq((1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L))
136+
137+
val sm1 = SketchMap.aggregator[Int, Long](EPS, DELTA, SEED, 5).apply(data)
138+
val sm2 = SketchMap.aggregator[Int, Long](EPS, DELTA, SEED, 3).apply(data)
139+
val sm3 = SketchMap.aggregator[Int, Long](EPS, DELTA, SEED, 1).apply(data)
140+
val sm4 = SketchMap.aggregator[Int, Long](EPS, DELTA, SEED, 0).apply(data)
141+
142+
sm1.heavyHitterKeys must be_==(List(5, 4, 3, 2, 1))
143+
sm2.heavyHitterKeys must be_==(List(5, 4, 3))
144+
sm3.heavyHitterKeys must be_==(List(5))
145+
sm4.heavyHitterKeys must be_==(List.empty[Int])
146+
}
133147
}
134148
}

0 commit comments

Comments
 (0)