diff --git a/CHANGES.md b/CHANGES.md index 1357d2003..014552ea8 100755 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,7 @@ * Add `metricsLaws[T]` to `BaseProperties` in `algebird-test`: https://github.com/twitter/algebird/pull/584 * Modify generated `Tuple2Monoid`, etc to extend `TupleNSemigroup`, giving subclasses access to efficient `sumOption`: https://github.com/twitter/algebird/pull/585 * optimize `Generated{Abstract,Product}Algebra.sumOption` with benchmarking https://github.com/twitter/algebird/pull/591 +* Add an efficient `sumOption`, `+`, `-` and docs to `AveragedValue`: https://github.com/twitter/algebird/pull/589 ### Version 0.12.2 ### diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AveragedValueBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AveragedValueBenchmark.scala new file mode 100644 index 000000000..fa22dd374 --- /dev/null +++ b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AveragedValueBenchmark.scala @@ -0,0 +1,36 @@ +package com.twitter.algebird +package benchmark + +import scala.util.Random +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh.infra.Blackhole + +import scala.math._ + +object AveragedValueBenchmark { + @State(Scope.Benchmark) + class AVState { + @Param(Array("10000")) + var numElements: Int = 0 + + var inputData: Seq[AveragedValue] = _ + + @Setup(Level.Trial) + def setup(): Unit = { + inputData = Seq.fill(numElements)(AveragedValue(Random.nextInt(1000).toLong)) + } + } +} + +class AveragedValueBenchmark { + import AveragedValueBenchmark._ + import AveragedGroup.{ plus, sumOption } + + @Benchmark + def timePlus(state: AVState, bh: Blackhole) = + bh.consume(state.inputData.reduce(plus(_, _))) + + @Benchmark + def timeSumOption(state: AVState, bh: Blackhole) = + bh.consume(sumOption(state.inputData)) +} diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/HLLBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/HLLBenchmark.scala index af241915b..279188f16 100644 --- a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/HLLBenchmark.scala +++ b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/HLLBenchmark.scala @@ -1,15 +1,15 @@ -package com.twitter.algebird.benchmark +package com.twitter.algebird +package benchmark -import com.twitter.algebird._ import scala.util.Random import com.twitter.bijection._ import com.twitter.algebird.util._ -import java.util.concurrent.TimeUnit import org.openjdk.jmh.annotations._ import org.openjdk.jmh.infra.Blackhole import scala.math._ + class OldMonoid(bits: Int) extends HyperLogLogMonoid(bits) { import HyperLogLog._ diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AveragedValue.scala b/algebird-core/src/main/scala/com/twitter/algebird/AveragedValue.scala index 3b6c69aa8..656c9d06e 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AveragedValue.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AveragedValue.scala @@ -16,60 +16,162 @@ limitations under the License. package com.twitter.algebird +/** + * Tracks the count and mean value of Doubles in a data stream. + * + * Adding two instances of [[AveragedValue]] with [[+]] + * is equivalent to taking an average of the two streams, with each + * stream weighted by its count. + * + * The mean calculation uses a numerically stable online algorithm + * suitable for large numbers of records, similar to Chan et. al.'s + * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm + * parallel variance algorithm on Wikipedia]]. As long as your count + * doesn't overflow a Long, the mean calculation won't overflow. + * + * @see [[MomentsGroup.getCombinedMean]] for implementation of [[+]] + * @param count the number of aggregated items + * @param value the average value of all aggregated items + */ +case class AveragedValue(count: Long, value: Double) { + /** + * Returns a copy of this instance with a negative value. Note that + * + * {{{ + * a + -b == a - b + * }}} + */ + def unary_- : AveragedValue = copy(count = -count) + + /** + * Averages this instance with the *opposite* of the supplied + * [[AveragedValue]] instance, effectively subtracting out that + * instance's contribution to the mean. + * + * @param r the instance to subtract + * @return an instance with `r`'s stream subtracted out + */ + def -(r: AveragedValue): AveragedValue = AveragedGroup.minus(this, r) + + /** + * Averages this instance with another [[AveragedValue]] instance. + * @param r the other instance + * @return an instance representing the mean of this instance and `r`. + */ + def +(r: AveragedValue): AveragedValue = AveragedGroup.plus(this, r) + + /** + * Returns a new instance that averages `that` into this instance. + * + * @param that value to average into this instance + * @return an instance representing the mean of this instance and `that`. + */ + def +(that: Double): AveragedValue = + AveragedValue( + count + 1L, + MomentsGroup.getCombinedMean(count, value, 1L, that)) + + /** + * Returns a new instance that averages `that` into this instance. + * + * @param that value to average into this instance + * @return an instance representing the mean of this instance and `that`. + */ + def +[N](that: N)(implicit num: Numeric[N]): AveragedValue = + this + num.toDouble(that) +} + +/** + * Provides a set of operations needed to create and use + * [[AveragedValue]] instances. + */ object AveragedValue { + /** implicit instance of [[Group]][AveragedValue] */ implicit val group = AveragedGroup + + /** + * Returns an [[Aggregator]] that uses [[AveragedValue]] to + * calculate the mean of all `Double` values in the stream. Each + * Double value receives a count of 1 during aggregation. + */ def aggregator: Aggregator[Double, AveragedValue, Double] = Averager + + /** + * Returns an [[Aggregator]] that uses [[AveragedValue]] to + * calculate the mean of all values in the stream. Each numeric + * value receives a count of `1` during aggregation. + * + * @tparam N numeric type to convert into `Double` + */ def numericAggregator[N](implicit num: Numeric[N]): MonoidAggregator[N, AveragedValue, Double] = Aggregator.prepareMonoid { n: N => AveragedValue(num.toDouble(n)) } .andThenPresent(_.value) - def apply[V <% Double](v: V) = new AveragedValue(1L, v) - def apply[V <% Double](c: Long, v: V) = new AveragedValue(c, v) -} - -case class AveragedValue(count: Long, value: Double) + /** + * Creates [[AveragedValue]] with a value of `v` and a count of 1. + * + * @tparam V type with an implicit conversion to Double + */ + def apply[V <% Double](v: V): AveragedValue = apply(1L, v) -object AveragedGroup extends Group[AveragedValue] { - // When combining averages, if the counts sizes are too close we should use a different - // algorithm. This constant defines how close the ratio of the smaller to the total count - // can be: - private val STABILITY_CONSTANT = 0.1 /** - * Uses a more stable online algorithm which should - * be suitable for large numbers of records - * similar to: - * http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm + * Creates an [[AveragedValue]] with a count of of `c` and a value + * of `v`. + * + * @tparam V type with an implicit conversion to Double */ + def apply[V <% Double](c: Long, v: V): AveragedValue = new AveragedValue(c, v) +} + +/** + * [[Group]] implementation for [[AveragedValue]]. + * + * @define T `AveragedValue` + */ +object AveragedGroup extends Group[AveragedValue] { + import MomentsGroup.getCombinedMean + val zero = AveragedValue(0L, 0.0) override def isNonZero(av: AveragedValue) = (av.count != 0L) - override def negate(av: AveragedValue) = AveragedValue(-av.count, av.value) - - def plus(cntAve1: AveragedValue, cntAve2: AveragedValue): AveragedValue = { - val (big, small) = if (cntAve1.count >= cntAve2.count) - (cntAve1, cntAve2) - else - (cntAve2, cntAve1) - val n = big.count - val k = small.count - val newCnt = n + k - if (newCnt == n) { - // Handle zero without allocation - big - } else if (newCnt == 0L) { - zero - } else { - val an = big.value - val ak = small.value - val scaling = k.toDouble / newCnt - // a_n + (a_k - a_n)*(k/(n+k)) is only stable if n is not approximately k - val newAve = if (scaling < STABILITY_CONSTANT) (an + (ak - an) * scaling) else (n * an + k * ak) / newCnt - new AveragedValue(newCnt, newAve) + override def negate(av: AveragedValue) = -av + + /** + * Optimized implementation of [[plus]]. Uses internal mutation to + * combine the supplied [[AveragedValue]] instances without creating + * intermediate objects. + */ + override def sumOption(iter: TraversableOnce[AveragedValue]): Option[AveragedValue] = + if (iter.isEmpty) None + else { + var count = 0L + var average = 0.0 + iter.foreach { + case AveragedValue(c, v) => + average = getCombinedMean(count, average, c, v) + count += c + } + Some(AveragedValue(count, average)) } + + /** + * @inheritdoc + * @see [[AveragedValue.+]] for the implementation + */ + def plus(l: AveragedValue, r: AveragedValue): AveragedValue = { + val n = l.count + val k = r.count + val newAve = getCombinedMean(n, l.value, k, r.value) + AveragedValue(n + k, newAve) } } +/** + * [[Aggregator]] that uses [[AveragedValue]] to calculate the mean + * of all `Double` values in the stream. Each Double value receives a + * count of 1 during aggregation. + */ object Averager extends MonoidAggregator[Double, AveragedValue, Double] { val monoid = AveragedGroup def prepare(value: Double) = AveragedValue(value) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/First.scala b/algebird-core/src/main/scala/com/twitter/algebird/First.scala index 8e45a4fd3..bc73a7a7a 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/First.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/First.scala @@ -16,32 +16,63 @@ limitations under the License. package com.twitter.algebird /** - * First tracks the "most recent" item by the order in which items - * are seen. + * Tracks the "least recent", or earliest, wrapped instance of `T` by + * the order in which items are seen. + * + * @param get wrapped instance of `T` */ case class First[@specialized(Int, Long, Float, Double) +T](get: T) { + /** + * Returns this instance, always. + * + * @param r ignored instance of `First[U]` + */ def +[U >: T](r: First[U]): First[U] = this } +/** + * Provides a set of operations and typeclass instances needed to use + * [[First]] instances. + */ object First extends FirstInstances { + /** + * Returns an [[Aggregator]] that selects the first instance of `T` + * in the aggregated stream. + */ def aggregator[T]: FirstAggregator[T] = FirstAggregator() } private[algebird] sealed abstract class FirstInstances { - def firstSemigroup[T] = new Semigroup[T] { - def plus(l: T, r: T): T = l + /** + * Returns a [[Semigroup]] instance with a `plus` implementation + * that always returns the first (ie, the left) `T` argument. + * + * This semigroup's `sumOption` is efficient; it only selects the + * head of the `TraversableOnce` instance, leaving the rest + * untouched. + */ + def firstSemigroup[T]: Semigroup[T] = + new Semigroup[T] { + def plus(l: T, r: T): T = l - override def sumOption(iter: TraversableOnce[T]): Option[T] = - if (iter.isEmpty) None else Some(iter.toIterator.next) - } + override def sumOption(iter: TraversableOnce[T]): Option[T] = + if (iter.isEmpty) None else Some(iter.toIterator.next) + } + /** + * Returns a [[Semigroup]] instance for [[First]][T]. The `plus` + * implementation always returns the first (ie, the left) `First[T]` + * argument. + */ implicit def semigroup[T]: Semigroup[First[T]] = firstSemigroup[First[T]] } +/** + * [[Aggregator]] that selects the first instance of `T` in the + * aggregated stream. + */ case class FirstAggregator[T]() extends Aggregator[T, T, T] { def prepare(v: T) = v - val semigroup: Semigroup[T] = First.firstSemigroup[T] - def present(v: T) = v } diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Last.scala b/algebird-core/src/main/scala/com/twitter/algebird/Last.scala index 772d257b3..b862e890c 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Last.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Last.scala @@ -16,23 +16,53 @@ limitations under the License. package com.twitter.algebird /** - * Last tracks the "most recent" item by the order in which items are - * seen. + * Tracks the "most recent", or last, wrapped instance of `T` by the + * order in which items are seen. + * + * @param get wrapped instance of `T` */ case class Last[@specialized(Int, Long, Float, Double) +T](get: T) { + /** + * Returns the argument `r`, always. + * + * @param r returned of `Last[U]` + */ def +[U >: T](r: Last[U]): Last[U] = r } +/** + * Provides a set of operations and typeclass instances needed to use + * [[Last]] instances. + */ object Last extends LastInstances { + /** + * Returns an [[Aggregator]] that selects the last instance of `T` + * in the aggregated stream. + */ def aggregator[T]: LastAggregator[T] = LastAggregator() } private[algebird] sealed abstract class LastInstances { - implicit def semigroup[T]: Semigroup[Last[T]] = Semigroup.from { (l, r) => r } + /** + * Returns a [[Semigroup]] instance with a `plus` implementation + * that always returns the last (ie, the right) `T` argument. + */ + def lastSemigroup[T]: Semigroup[T] = Semigroup.from { (l, r) => r } + + /** + * Returns a [[Semigroup]] instance for [[Last]][T]. The `plus` + * implementation always returns the last (ie, the right) `Last[T]` + * argument. + */ + implicit def semigroup[T]: Semigroup[Last[T]] = lastSemigroup[Last[T]] } +/** + * [[Aggregator]] that selects the last instance of `T` in the + * aggregated stream. + */ case class LastAggregator[T]() extends Aggregator[T, T, T] { def prepare(v: T) = v - val semigroup: Semigroup[T] = Semigroup.from { (l: T, r: T) => r } + val semigroup: Semigroup[T] = Last.lastSemigroup[T] def present(v: T) = v } diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Max.scala b/algebird-core/src/main/scala/com/twitter/algebird/Max.scala index 9418bab09..ffc035770 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Max.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Max.scala @@ -17,47 +17,101 @@ package com.twitter.algebird import scala.annotation.tailrec -// To use the MaxSemigroup wrap your item in Max +/** + * Tracks the maximum wrapped instance of some ordered type `T`. + * + * [[Max]][T] is a [[Semigroup]] for all types `T`. If `T` has some + * minimum element (`Long` has `Long.MinValue`, for example), then + * [[Max]][T] is a [[Monoid]]. + * + * @param get wrapped instance of `T` + */ case class Max[@specialized(Int, Long, Float, Double) +T](get: T) { + /** + * If this instance wraps a larger `T` than `r`, returns this + * instance, else returns `r`. + * + * @param r instance of `Max[U]` for comparison + */ def max[U >: T](r: Max[U])(implicit ord: Ordering[U]): Max[U] = Max.ordering.max(this, r) + + /** + * Identical to [[max]]. + * + * @param r instance of `Max[U]` for comparison + */ def +[U >: T](r: Max[U])(implicit ord: Ordering[U]): Max[U] = max(r) } +/** + * Provides a set of operations and typeclass instances needed to use + * [[Max]] instances. + */ object Max extends MaxInstances { + /** + * Returns an [[Aggregator]] that selects the maximum instance of an + * ordered type `T` in the aggregated stream. + */ def aggregator[T](implicit ord: Ordering[T]): MaxAggregator[T] = MaxAggregator()(ord) - // TODO: Note that this is a semilattice, for when we merge - // typelevel/algebra. + /** + * Returns a [[Semigroup]] instance with a `plus` implementation + * that always returns the maximum `T` argument. + * + * @todo Note that this is a semilattice, for when we merge typelevel/algebra. + */ def maxSemigroup[T](implicit ord: Ordering[T]): Semigroup[T] = Semigroup.from { (l: T, r: T) => ord.max(l, r) } } private[algebird] sealed abstract class MaxInstances { implicit def equiv[T](implicit eq: Equiv[T]): Equiv[Max[T]] = Equiv.by(_.get) + implicit def ordering[T: Ordering]: Ordering[Max[T]] = Ordering.by(_.get) private[this] def plus[T](implicit ord: Ordering[T]) = { (l: Max[T], r: Max[T]) => if (ord.gteq(l.get, r.get)) l else r } - // Zero should have the property that it <= all T + /** + * Returns a [[Monoid]] instance for [[Max]][T] that combines + * instances using [[Max.max]] and uses `zero` for its identity. + * + * @param zero identity of the returned [[Monoid]] instance + * @note `zero` must be `<=` every element of `T` for the returned instance to be lawful. + */ def monoid[T: Ordering](zero: => T): Monoid[Max[T]] = Monoid.from(Max(zero))(plus) - // There's no need to override `sumOption`, since the default - // implementation does no allocation other than the outer `Option` - // and `plus` doesn't do any allocation. - implicit def semigroup[T: Ordering]: Semigroup[Max[T]] = Semigroup.from(plus) - - implicit def ordering[T: Ordering]: Ordering[Max[T]] = Ordering.by(_.get) - + /** + * Returns a [[Semigroup]] instance for [[Max]][T]. The `plus` + * implementation always returns the maximum `Max[T]` argument. + */ + implicit def semigroup[T: Ordering]: Semigroup[Max[T]] = + // There's no need to override `sumOption`, since the default + // implementation does no allocation other than the outer `Option` + // and `plus` doesn't do any allocation. + Semigroup.from(plus) + + /** [[Monoid]] for [[Max]][Int] with `zero == Int.MinValue` */ implicit def intMonoid: Monoid[Max[Int]] = monoid(Int.MinValue) + + /** [[Monoid]] for [[Max]][Long] with `zero == Long.MinValue` */ implicit def longMonoid: Monoid[Max[Long]] = monoid(Long.MinValue) + + /** [[Monoid]] for [[Max]][Double] with `zero == Double.MinValue` */ implicit def doubleMonoid: Monoid[Max[Double]] = monoid(Double.MinValue) + + /** [[Monoid]] for [[Max]][Float] with `zero == Float.MinValue` */ implicit def floatMonoid: Monoid[Max[Float]] = monoid(Float.MinValue) - // These have a lower bound, but not an upperbound, so the Max forms a monoid: + /** [[Monoid]] for [[Max]][String] with `zero == ""` */ implicit def stringMonoid: Monoid[Max[String]] = monoid("") + /** + * Returns a [[Monoid]] instance for `Max[List[T]]` that compares + * lists first by length and then element-wise by `T`, and returns + * the maximum value. + */ implicit def listMonoid[T: Ordering]: Monoid[Max[List[T]]] = monoid[List[T]](Nil)( new Ordering[List[T]] { @tailrec @@ -76,7 +130,7 @@ private[algebird] sealed abstract class MaxInstances { // TODO: Replace with // cast.kernel.instances.StaticMethods.iteratorCompare when we // merge with cats. - def iteratorCompare[T](xs: Iterator[T], ys: Iterator[T])(implicit ord: Ordering[T]): Int = { + private def iteratorCompare[T](xs: Iterator[T], ys: Iterator[T])(implicit ord: Ordering[T]): Int = { while (true) { if (xs.hasNext) { if (ys.hasNext) { @@ -94,6 +148,11 @@ private[algebird] sealed abstract class MaxInstances { 0 } + /** + * Returns a [[Monoid]] instance for `Max[Vector[T]]` that compares + * lists first by length and then element-wise by `T`, and returns + * the maximum value. + */ implicit def vectorMonoid[T: Ordering]: Monoid[Max[Vector[T]]] = monoid[Vector[T]](Vector.empty[T])( new Ordering[Vector[T]] { @@ -103,6 +162,11 @@ private[algebird] sealed abstract class MaxInstances { } }) + /** + * Returns a [[Monoid]] instance for `Max[Stream[T]]` that compares + * lists first by length and then element-wise by `T`, and returns + * the maximum value. + */ implicit def streamMonoid[T: Ordering]: Monoid[Max[Stream[T]]] = monoid[Stream[T]](Stream.empty[T])( new Ordering[Stream[T]] { @@ -113,6 +177,10 @@ private[algebird] sealed abstract class MaxInstances { }) } +/** + * [[Aggregator]] that selects the maximum instance of `T` in the + * aggregated stream. + */ case class MaxAggregator[T](implicit ord: Ordering[T]) extends Aggregator[T, T, T] { def prepare(v: T) = v val semigroup = Max.maxSemigroup[T] diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Min.scala b/algebird-core/src/main/scala/com/twitter/algebird/Min.scala index 372198c3e..c978021c5 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Min.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Min.scala @@ -15,39 +15,92 @@ limitations under the License. */ package com.twitter.algebird -// To use the MinSemigroup wrap your item in a Min object +/** + * Tracks the minimum wrapped instance of some ordered type `T`. + * + * [[Min]][T] is a [[Semigroup]] for all types `T`. If `T` has some + * maximum element (`Long` has `Long.MaxValue`, for example), then + * [[Min]][T] is a [[Monoid]]. + * + * @param get wrapped instance of `T` + */ case class Min[@specialized(Int, Long, Float, Double) +T](get: T) { + /** + * If this instance wraps a smaller `T` than `r`, returns this + * instance, else returns `r`. + * + * @param r instance of `Min[U]` for comparison + */ def min[U >: T](r: Min[U])(implicit ord: Ordering[U]): Min[U] = Min.ordering.min(this, r) + + /** + * Identical to [[min]]. + * + * @param r instance of `Min[U]` for comparison + */ def +[U >: T](r: Min[U])(implicit ord: Ordering[U]): Min[U] = min(r) } +/** + * Provides a set of operations and typeclass instances needed to use + * [[Min]] instances. + */ object Min extends MinInstances { + /** + * Returns an [[Aggregator]] that selects the minimum instance of an + * ordered type `T` in the aggregated stream. + */ def aggregator[T](implicit ord: Ordering[T]): MinAggregator[T] = MinAggregator()(ord) + + /** + * Returns a [[Semigroup]] instance with a `plus` implementation + * that always returns the minimum `T` argument. + */ def minSemigroup[T](implicit ord: Ordering[T]): Semigroup[T] = Semigroup.from { (l: T, r: T) => ord.min(l, r) } } private[algebird] sealed abstract class MinInstances { implicit def equiv[T](implicit eq: Equiv[T]): Equiv[Min[T]] = Equiv.by(_.get) + implicit def ordering[T: Ordering]: Ordering[Min[T]] = Ordering.by(_.get) private[this] def plus[T](implicit ord: Ordering[T]) = { (l: Min[T], r: Min[T]) => if (ord.lteq(l.get, r.get)) l else r } - // Zero should have the property that it >= all T + /** + * Returns a [[Monoid]] instance for [[Min]][T] that combines + * instances using [[Min.min]] and uses `zero` for its identity. + * + * @param zero identity of the returned [[Monoid]] instance + * @note `zero` must be `>=` every element of `T` for the returned instance to be lawful. + */ def monoid[T: Ordering](zero: => T): Monoid[Min[T]] = Monoid.from(Min(zero))(plus) + /** + * Returns a [[Semigroup]] instance for [[Min]][T]. The `plus` + * implementation always returns the minimum `Min[T]` argument. + */ implicit def semigroup[T: Ordering]: Semigroup[Min[T]] = Semigroup.from(plus) - implicit def ordering[T: Ordering]: Ordering[Min[T]] = Ordering.by(_.get) - + /** [[Monoid]] for [[Min]][Int] with `zero == Int.MaxValue` */ implicit def intMonoid: Monoid[Min[Int]] = monoid(Int.MaxValue) + + /** [[Monoid]] for [[Min]][Long] with `zero == Long.MaxValue` */ implicit def longMonoid: Monoid[Min[Long]] = monoid(Long.MaxValue) + + /** [[Monoid]] for [[Min]][Double] with `zero == Double.MaxValue` */ implicit def doubleMonoid: Monoid[Min[Double]] = monoid(Double.MaxValue) + + /** [[Monoid]] for [[Min]][Float] with `zero == Float.MaxValue` */ implicit def floatMonoid: Monoid[Min[Float]] = monoid(Float.MaxValue) } +/** + * [[Aggregator]] that selects the minimum instance of `T` in the + * aggregated stream. + */ case class MinAggregator[T](implicit ord: Ordering[T]) extends Aggregator[T, T, T] { def prepare(v: T) = v val semigroup = Min.minSemigroup[T] diff --git a/algebird-core/src/main/scala/com/twitter/algebird/MomentsGroup.scala b/algebird-core/src/main/scala/com/twitter/algebird/MomentsGroup.scala index e5b600374..77f84abf3 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/MomentsGroup.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/MomentsGroup.scala @@ -57,9 +57,9 @@ object Moments { // Create a Moments object given a single value. This is useful for // initializing moment calculations at the start of a stream. - def apply[V <% Double](value: V) = new Moments(1L, value, 0, 0, 0) + def apply[V <% Double](value: V): Moments = new Moments(1L, value, 0, 0, 0) - def apply[V <% Double](m0: Long, m1: V, m2: V, m3: V, m4: V) = + def apply[V <% Double](m0: Long, m1: V, m2: V, m3: V, m4: V): Moments = new Moments(m0, m1, m2, m3, m4) } @@ -68,16 +68,37 @@ object Moments { */ object MomentsGroup extends Group[Moments] { - // When combining averages, if the counts sizes are too close we should use a different - // algorithm. This constant defines how close the ratio of the smaller to the total count - // can be. + /** + * When combining averages, if the counts sizes are too close we + * should use a different algorithm. This constant defines how + * close the ratio of the smaller to the total count can be: + */ private val STABILITY_CONSTANT = 0.1 + /** + * Given two streams of doubles (n, an) and (k, ak) of form (count, + * mean), calculates the mean of the combined stream. + * + * Uses a more stable online algorithm which should be suitable for + * large numbers of records similar to: + * http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm + */ + def getCombinedMean(n: Long, an: Double, k: Long, ak: Double): Double = + if (n < k) getCombinedMean(k, ak, n, an) + else (n + k) match { + case 0L => 0.0 + case newCount if (newCount == n) => an + case newCount => + val scaling = k.toDouble / newCount + // a_n + (a_k - a_n)*(k/(n+k)) is only stable if n is not approximately k + if (scaling < STABILITY_CONSTANT) (an + (ak - an) * scaling) + else (n * an + k * ak) / newCount + } + val zero = Moments(0L, 0.0, 0.0, 0.0, 0.0) - override def negate(a: Moments): Moments = { + override def negate(a: Moments): Moments = Moments(-a.count, a.m1, -a.m2, -a.m3, -a.m4) - } // Combines the moment calculations from two streams. // See http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics @@ -105,39 +126,6 @@ object MomentsGroup extends Group[Moments] { Moments(countCombined, meanCombined, m2, m3, m4) } - - /** - * Given two streams of doubles A and B, with the specified counts and means, - * calculate the mean of the combined stream. - */ - def getCombinedMean(countA: Long, meanA: Double, countB: Long, meanB: Double): Double = { - val (big, small) = - if (math.abs(countA) >= math.abs(countB)) - ((countA, meanA), (countB, meanB)) - else - ((countB, meanB), (countA, meanA)) - - val (countBig, meanBig) = big - val (countSmall, meanSmall) = small - - if (countSmall == 0) { - meanBig - } else { - val countCombined = countSmall + countBig - val scaling = countSmall.toDouble / countCombined - val meanCombined = - if (math.abs(scaling) < STABILITY_CONSTANT) - // This formula for the combined mean is only stable if - // countA is not approximately countB. - meanBig + (meanSmall - meanBig) * scaling - else - // Use this more stable formulation if the sizes of the two streams - // are close. - (countBig * meanBig + countSmall * meanSmall) / countCombined - - meanCombined - } - } } object MomentsAggregator extends MonoidAggregator[Double, Moments, Moments] { diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Monoid.scala b/algebird-core/src/main/scala/com/twitter/algebird/Monoid.scala index ac0c8e9b1..9072ea2cb 100755 --- a/algebird-core/src/main/scala/com/twitter/algebird/Monoid.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Monoid.scala @@ -29,9 +29,9 @@ import scala.collection.{ Map => ScMap } * Monoid (take a deep breath, and relax about the weird name): * This is a semigroup that has an additive identity (called zero), such that a+0=a, 0+a=a, for every a */ - @implicitNotFound(msg = "Cannot find Monoid type class for ${T}") trait Monoid[@specialized(Int, Long, Float, Double) T] extends Semigroup[T] { + /** Returns the identity element of `$T` for [[plus]]. */ def zero: T //additive identity def isNonZero(v: T): Boolean = (v != zero) def assertNotZero(v: T) { diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Semigroup.scala b/algebird-core/src/main/scala/com/twitter/algebird/Semigroup.scala index 6b7565c48..0b4f53985 100755 --- a/algebird-core/src/main/scala/com/twitter/algebird/Semigroup.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Semigroup.scala @@ -25,14 +25,36 @@ import scala.annotation.{ implicitNotFound, tailrec } import macros.caseclass._ /** - * Semigroup: - * This is a class with a plus method that is associative: a+(b+c) = (a+b)+c + * A semigroup is any type `T` with an associative operation (`plus`): + * + * {{{ + * a plus (b plus c) = (a plus b) plus c + * }}} + * + * Example instances: + * - `Semigroup[Int]`: `plus` `Int#+` + * - `Semigroup[List[T]]`: `plus` is `List#++` + * + * @define T T */ @implicitNotFound(msg = "Cannot find Semigroup type class for ${T}") trait Semigroup[@specialized(Int, Long, Float, Double) T] extends java.io.Serializable { + /** + * Combines two `$T` instances associatively. + * + * @return result of combining `l` and `r` + */ def plus(l: T, r: T): T + /** - * override this if there is a faster way to do this sum than reduceLeftOption on plus + * Returns an instance of `$T` calculated by summing all instances in + * `iter` in one pass. Returns `None` if `iter` is empty, else + * `Some[$T]`. + * + * @param iter instances of `$T` to be combined + * @return `None` if `iter` is empty, else an option value containing the summed `$T` + * @note Override if there is a faster way to compute this sum than + * `iter.reduceLeftOption` using [[plus]]. */ def sumOption(iter: TraversableOnce[T]): Option[T] = iter.reduceLeftOption { plus(_, _) } @@ -47,6 +69,8 @@ abstract class AbstractSemigroup[T] extends Semigroup[T] * wrong, use Left. plus does the normal thing for plus(Right, Right), or plus(Left, Left), * but if exactly one is Left, we return that value (to keep the error condition). * Typically, the left value will be a string representing the errors. + * + * @define T Either[L, R] */ class EitherSemigroup[L, R](implicit semigroupl: Semigroup[L], semigroupr: Semigroup[R]) extends Semigroup[Either[L, R]] { diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AveragedValueLaws.scala b/algebird-test/src/test/scala/com/twitter/algebird/AveragedValueLaws.scala index 73f56c147..c38d30105 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AveragedValueLaws.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AveragedValueLaws.scala @@ -2,15 +2,70 @@ package com.twitter.algebird import com.twitter.algebird.BaseProperties._ import com.twitter.algebird.scalacheck.arbitrary._ +import com.twitter.algebird.scalacheck.NonEmptyVector +import org.scalacheck.Arbitrary +import org.scalacheck.Prop.forAll class AveragedValueLaws extends CheckProperties { - property("AveragedValue Group laws") { + def avg[T](v: Vector[T])(implicit num: Numeric[T]): Double = { + val items = v.map(num.toDouble(_)) + val sum = items.sum + val count = items.size + sum / count + } + + property("AveragedValue is a group and commutative") { implicit val equiv: Equiv[AveragedValue] = Equiv.fromFunction { (avl, avr) => ((avl.count == 0L) && (avr.count == 0L)) || { approxEq(1e-10)(avl.value, avr.value) && (avl.count == avr.count) } } - groupLawsEquiv[AveragedValue] + groupLawsEquiv[AveragedValue] && isCommutativeEquiv[AveragedValue] + } + + property("AveragedValue.aggregator returns the average") { + forAll { v: NonEmptyVector[Double] => + approxEq(1e-10)(avg(v.items), AveragedValue.aggregator(v.items)) + } + } + + property("AveragedValue instances subtract") { + forAll { (l: AveragedValue, r: AveragedValue) => + l + -l == Monoid.zero[AveragedValue] && + l - l == Monoid.zero[AveragedValue] && + l - r == l + -r + } + } + + property("AveragedValue can absorb numbers directly") { + forAll { (base: AveragedValue, x: Long) => + (base + AveragedValue(x)) == (base + x) + } + } + + property("AveragedValue works by + or sumOption") { + forAll { v: NonEmptyVector[Double] => + val avgs = v.items.map(AveragedValue(_)) + val sumOpt = Semigroup.sumOption[AveragedValue](avgs).get + val byPlus = avgs.reduce(_ + _) + + approxEq(1e-10)(avg(v.items), sumOpt.value) && + approxEq(1e-10)(sumOpt.value, byPlus.value) + } + } + + def numericAggregatorTest[T: Numeric: Arbitrary] = + forAll { v: NonEmptyVector[T] => + val averaged = AveragedValue.numericAggregator[T].apply(v.items) + approxEq(1e-10)(avg(v.items), averaged) + } + + property("AveragedValue.numericAggregator averages BigInt") { + numericAggregatorTest[BigInt] + } + + property("AveragedValue.numericAggregator averages Long") { + numericAggregatorTest[Long] } } diff --git a/build.sbt b/build.sbt index 0dea99e1f..c1892204d 100644 --- a/build.sbt +++ b/build.sbt @@ -36,7 +36,6 @@ def docsSourcesAndProjects(sv: String): (Boolean, Seq[ProjectReference]) = algebirdCore, algebirdUtil, algebirdBijection, - algebirdBenchmark, algebirdSpark)) } @@ -166,6 +165,7 @@ lazy val algebird = Project( base = file("."), settings = sharedSettings) .settings(noPublishSettings) + .settings(coverageExcludedPackages := ";.*\\.benchmark\\..*") .aggregate( algebirdTest, algebirdCore, diff --git a/docs/src/main/tut/datatypes/averaged_value.md b/docs/src/main/tut/datatypes/averaged_value.md index 1d91651ff..2914ba098 100644 --- a/docs/src/main/tut/datatypes/averaged_value.md +++ b/docs/src/main/tut/datatypes/averaged_value.md @@ -8,8 +8,102 @@ scaladoc: "#com.twitter.algebird.AveragedValue" # Averaged Value -### Documentation Help +The `AveragedValue` data structure keeps track of the `count` and `mean` of a stream of numbers with a single pass over the data. The mean calculation uses a numerically stable online algorithm suitable for large numbers of records, similar to Chan et. al.'s [parallel variance algorithm on Wikipedia](http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm). As long as your count doesn't overflow a `Long`, the mean calculation won't overflow. + +You can build instances of `AveragedValue` from any numeric type: + +```tut:book +import com.twitter.algebird._ + +val longVal = AveragedValue(3L) +val doubleVal = AveragedValue(12.0) +val intVal = AveragedValue(15) +``` + +## Algebraic Properties + +Combining instances with `+` generates a new instance by adding the `count`s and averaging the `value`s: + +```tut:book +longVal + doubleVal +longVal + doubleVal + intVal +``` + +You can also add numbers directly to an `AveragedValue` instance: + +```tut:book +longVal + 12 +``` + +`AveragedValue` is a commutative group. This means you can add instances in any order: + +```tut:book +longVal + doubleVal == doubleVal + doubleVal +``` + +An `AveragedValue` with a count and value of `0` act as `Monoid.zero`: + +```tut:book +Monoid.zero[AveragedValue] +longVal + Monoid.zero[AveragedValue] == longVal +``` + +Subtracting `AveragedValue`s is the opposite of addition: + +```tut:book +intVal - longVal +intVal + doubleVal - doubleVal +``` + +### Stable Average Algorithm -We'd love your help fleshing out this documentation! You can edit this page in your browser by clicking [this link](https://github.com/twitter/algebird/edit/develop/docs/src/main/tut/datatypes/averaged_value.md). These links might be helpful: +Each `AveragedValue` instance represents a stream of data. `AveragedValue` uses Chan et. al.'s [parallel algorithm](http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm) to combine the mean values of each stream. Here's the calculation: + +```scala +// big and small are two AveragedValue instances +val deltaOfMeans = big.value - small.value +val newCount = big.count + small.count +val newMean = small.value + deltaOfMeans * (big.count / newCount) +``` + +As long as `newCount` stays within the bounds of `Long`, this calculation won't overflow for large `value`. + +The [Wikipedia presentation of this algorithm](http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm) points out that if both streams are close in size, the algorithm is "prone to loss of precision due to [catastrophic cancellation](https://en.wikipedia.org/wiki/Loss_of_significance)". + +`AveragedValue` guards against this instability by taking a weighted average of the two instances if the smaller of the two has a count greater than `10%` of the combined count. + +```scala +val newCount = big.count + small.count +(big.count * big.value + small.count * small.value) / newCount +``` + +## Aggregator + +`AveragedValue.aggregator` returns an `Aggregator` that uses `AveragedValue` to calculate the mean of all `Double` values in a stream. For example: + +```tut:book +val items = List[Double](1.0, 2.2, 3.3, 4.4, 5.5) +AveragedValue.aggregator(items) +``` + +`AveragedValue.numericAggregator` works the same way for any numeric type: + +```tut:book +val items = List[Int](1, 3, 5, 7) +AveragedValue.numericAggregator[Int].apply(items) +``` + +## Related Data Structures + +- [DecayedValue](decayed_value.html) calculates a decayed moving average of a stream; this allows you to approximate the mean of a bounded sliding window of the datastream. +- In addition to count and mean, [Moments](five_moments.html) allows you to keep track of the standard deviation, skewness and kurtosis of a stream with one pass over the data. + +### Links + +- Source: [AveragedValue.scala](https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/AveragedValue.scala) +- Tests: [AveragedValueLaws.scala](https://github.com/twitter/algebird/blob/develop/algebird-test/src/test/scala/com/twitter/algebird/AveragedValueLaws.scala) +- Scaladoc: [AveragedValue]({{site.baseurl}}/api#com.twitter.algebird.AveragedValue) + +### Documentation Help -- [AveragedValue.scala](https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/AveragedValue.scala) +We'd love your help fleshing out this documentation! You can edit this page in your browser by clicking [this link](https://github.com/twitter/algebird/edit/develop/docs/src/main/tut/datatypes/averaged_value.md). diff --git a/docs/src/main/tut/datatypes/first_and_last.md b/docs/src/main/tut/datatypes/first_and_last.md index 308d8269a..20119d9f2 100644 --- a/docs/src/main/tut/datatypes/first_and_last.md +++ b/docs/src/main/tut/datatypes/first_and_last.md @@ -72,7 +72,7 @@ See [the docs on `Min` and `Max`](min_and_max.html) for more information. ### Links - Source: [First.scala](https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/First.scala) and [Last.scala](https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/Last.scala) -- Tests: [FirstSpec.scala](https://github.com/twitter/algebird/blob/develop/algebird-test/src/test/scala/com/twitter/algebird/FirstSpec.scala) and [LastSpec.scala](https://github.com/twitter/algebird/blob/develop/algebird-test/src/test/scala/com/twitter/algebird/LastSpec.scala) +- Tests: [FirstLaws.scala](https://github.com/twitter/algebird/blob/develop/algebird-test/src/test/scala/com/twitter/algebird/FirstLaws.scala) and [LastLaws.scala](https://github.com/twitter/algebird/blob/develop/algebird-test/src/test/scala/com/twitter/algebird/LastLaws.scala) - Scaladoc: [First]({{site.baseurl}}/api#com.twitter.algebird.First) and [Last]({{site.baseurl}}/api#com.twitter.algebird.Last) ### Documentation Help diff --git a/docs/src/main/tut/datatypes/min_and_max.md b/docs/src/main/tut/datatypes/min_and_max.md index 6154d8ffb..f081a0302 100644 --- a/docs/src/main/tut/datatypes/min_and_max.md +++ b/docs/src/main/tut/datatypes/min_and_max.md @@ -105,7 +105,7 @@ See [the docs on `First` and `Last`](first_and_last.html) for more information. ### Links - Source: [Min.scala](https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/Min.scala) and [Max.scala](https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/Max.scala) -- Tests: [MinSpec.scala](https://github.com/twitter/algebird/blob/develop/algebird-test/src/test/scala/com/twitter/algebird/MinSpec.scala) and [MaxSpec.scala](https://github.com/twitter/algebird/blob/develop/algebird-test/src/test/scala/com/twitter/algebird/MaxSpec.scala) +- Tests: [MinLaws.scala](https://github.com/twitter/algebird/blob/develop/algebird-test/src/test/scala/com/twitter/algebird/MinLaws.scala) and [MaxLaws.scala](https://github.com/twitter/algebird/blob/develop/algebird-test/src/test/scala/com/twitter/algebird/MaxLaws.scala) - Scaladoc: [Min]({{site.baseurl}}/api#com.twitter.algebird.Min) and [Max]({{site.baseurl}}/api#com.twitter.algebird.Max) ### Documentation Help