Skip to content

Coverage and documentation for AveragedValue #589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Add `metricsLaws[T]` to `BaseProperties` in `algebird-test`: https://github.com/twitter/algebird/pull/584
* Modify generated `Tuple2Monoid`, etc to extend `TupleNSemigroup`, giving subclasses access to efficient `sumOption`: https://github.com/twitter/algebird/pull/585
* optimize `Generated{Abstract,Product}Algebra.sumOption` with benchmarking https://github.com/twitter/algebird/pull/591
* Add an efficient `sumOption`, `+`, `-` and docs to `AveragedValue`: https://github.com/twitter/algebird/pull/589

### Version 0.12.2 ###

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.twitter.algebird
package benchmark

import scala.util.Random
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

import scala.math._

object AveragedValueBenchmark {
@State(Scope.Benchmark)
class AVState {
@Param(Array("10000"))
var numElements: Int = 0

var inputData: Seq[AveragedValue] = _

@Setup(Level.Trial)
def setup(): Unit = {
inputData = Seq.fill(numElements)(AveragedValue(Random.nextInt(1000).toLong))
}
}
}

class AveragedValueBenchmark {
import AveragedValueBenchmark._
import AveragedGroup.{ plus, sumOption }

@Benchmark
def timePlus(state: AVState, bh: Blackhole) =
bh.consume(state.inputData.reduce(plus(_, _)))

@Benchmark
def timeSumOption(state: AVState, bh: Blackhole) =
bh.consume(sumOption(state.inputData))
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.twitter.algebird.benchmark
package com.twitter.algebird
package benchmark

import com.twitter.algebird._
import scala.util.Random
import com.twitter.bijection._

import com.twitter.algebird.util._
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

import scala.math._

class OldMonoid(bits: Int) extends HyperLogLogMonoid(bits) {
import HyperLogLog._

Expand Down
174 changes: 138 additions & 36 deletions algebird-core/src/main/scala/com/twitter/algebird/AveragedValue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,162 @@ limitations under the License.

package com.twitter.algebird

/**
* Tracks the count and mean value of Doubles in a data stream.
*
* Adding two instances of [[AveragedValue]] with [[+]]
* is equivalent to taking an average of the two streams, with each
* stream weighted by its count.
*
* The mean calculation uses a numerically stable online algorithm
* suitable for large numbers of records, similar to Chan et. al.'s
* [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
* parallel variance algorithm on Wikipedia]]. As long as your count
* doesn't overflow a Long, the mean calculation won't overflow.
*
* @see [[MomentsGroup.getCombinedMean]] for implementation of [[+]]
* @param count the number of aggregated items
* @param value the average value of all aggregated items
*/
case class AveragedValue(count: Long, value: Double) {
/**
* Returns a copy of this instance with a negative value. Note that
*
* {{{
* a + -b == a - b
* }}}
*/
def unary_- : AveragedValue = copy(count = -count)

/**
* Averages this instance with the *opposite* of the supplied
* [[AveragedValue]] instance, effectively subtracting out that
* instance's contribution to the mean.
*
* @param r the instance to subtract
* @return an instance with `r`'s stream subtracted out
*/
def -(r: AveragedValue): AveragedValue = AveragedGroup.minus(this, r)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while we are at it, should we add:

def +(that: Double): AveragedValue = AveragedValue(count + 1L, ....)`
def +[N](that: N)(implicit num: Numeric[N]): AveragedValue = this + (num.toDouble(that))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

/**
* Averages this instance with another [[AveragedValue]] instance.
* @param r the other instance
* @return an instance representing the mean of this instance and `r`.
*/
def +(r: AveragedValue): AveragedValue = AveragedGroup.plus(this, r)

/**
* Returns a new instance that averages `that` into this instance.
*
* @param that value to average into this instance
* @return an instance representing the mean of this instance and `that`.
*/
def +(that: Double): AveragedValue =
AveragedValue(
count + 1L,
MomentsGroup.getCombinedMean(count, value, 1L, that))

/**
* Returns a new instance that averages `that` into this instance.
*
* @param that value to average into this instance
* @return an instance representing the mean of this instance and `that`.
*/
def +[N](that: N)(implicit num: Numeric[N]): AveragedValue =
this + num.toDouble(that)
}

/**
* Provides a set of operations needed to create and use
* [[AveragedValue]] instances.
*/
object AveragedValue {
/** implicit instance of [[Group]][AveragedValue] */
implicit val group = AveragedGroup

/**
* Returns an [[Aggregator]] that uses [[AveragedValue]] to
* calculate the mean of all `Double` values in the stream. Each
* Double value receives a count of 1 during aggregation.
*/
def aggregator: Aggregator[Double, AveragedValue, Double] = Averager

/**
* Returns an [[Aggregator]] that uses [[AveragedValue]] to
* calculate the mean of all values in the stream. Each numeric
* value receives a count of `1` during aggregation.
*
* @tparam N numeric type to convert into `Double`
*/
def numericAggregator[N](implicit num: Numeric[N]): MonoidAggregator[N, AveragedValue, Double] =
Aggregator.prepareMonoid { n: N => AveragedValue(num.toDouble(n)) }
.andThenPresent(_.value)

def apply[V <% Double](v: V) = new AveragedValue(1L, v)
def apply[V <% Double](c: Long, v: V) = new AveragedValue(c, v)
}

case class AveragedValue(count: Long, value: Double)
/**
* Creates [[AveragedValue]] with a value of `v` and a count of 1.
*
* @tparam V type with an implicit conversion to Double
*/
def apply[V <% Double](v: V): AveragedValue = apply(1L, v)

object AveragedGroup extends Group[AveragedValue] {
// When combining averages, if the counts sizes are too close we should use a different
// algorithm. This constant defines how close the ratio of the smaller to the total count
// can be:
private val STABILITY_CONSTANT = 0.1
/**
* Uses a more stable online algorithm which should
* be suitable for large numbers of records
* similar to:
* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
* Creates an [[AveragedValue]] with a count of of `c` and a value
* of `v`.
*
* @tparam V type with an implicit conversion to Double
*/
def apply[V <% Double](c: Long, v: V): AveragedValue = new AveragedValue(c, v)
}

/**
* [[Group]] implementation for [[AveragedValue]].
*
* @define T `AveragedValue`
*/
object AveragedGroup extends Group[AveragedValue] {
import MomentsGroup.getCombinedMean

val zero = AveragedValue(0L, 0.0)

override def isNonZero(av: AveragedValue) = (av.count != 0L)

override def negate(av: AveragedValue) = AveragedValue(-av.count, av.value)

def plus(cntAve1: AveragedValue, cntAve2: AveragedValue): AveragedValue = {
val (big, small) = if (cntAve1.count >= cntAve2.count)
(cntAve1, cntAve2)
else
(cntAve2, cntAve1)
val n = big.count
val k = small.count
val newCnt = n + k
if (newCnt == n) {
// Handle zero without allocation
big
} else if (newCnt == 0L) {
zero
} else {
val an = big.value
val ak = small.value
val scaling = k.toDouble / newCnt
// a_n + (a_k - a_n)*(k/(n+k)) is only stable if n is not approximately k
val newAve = if (scaling < STABILITY_CONSTANT) (an + (ak - an) * scaling) else (n * an + k * ak) / newCnt
new AveragedValue(newCnt, newAve)
override def negate(av: AveragedValue) = -av

/**
* Optimized implementation of [[plus]]. Uses internal mutation to
* combine the supplied [[AveragedValue]] instances without creating
* intermediate objects.
*/
override def sumOption(iter: TraversableOnce[AveragedValue]): Option[AveragedValue] =
if (iter.isEmpty) None
else {
var count = 0L
var average = 0.0
iter.foreach {
case AveragedValue(c, v) =>
average = getCombinedMean(count, average, c, v)
count += c
}
Some(AveragedValue(count, average))
}

/**
* @inheritdoc
* @see [[AveragedValue.+]] for the implementation
*/
def plus(l: AveragedValue, r: AveragedValue): AveragedValue = {
val n = l.count
val k = r.count
val newAve = getCombinedMean(n, l.value, k, r.value)
AveragedValue(n + k, newAve)
}
}

/**
* [[Aggregator]] that uses [[AveragedValue]] to calculate the mean
* of all `Double` values in the stream. Each Double value receives a
* count of 1 during aggregation.
*/
object Averager extends MonoidAggregator[Double, AveragedValue, Double] {
val monoid = AveragedGroup
def prepare(value: Double) = AveragedValue(value)
Expand Down
49 changes: 40 additions & 9 deletions algebird-core/src/main/scala/com/twitter/algebird/First.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,63 @@ limitations under the License.
package com.twitter.algebird

/**
* First tracks the "most recent" item by the order in which items
* are seen.
* Tracks the "least recent", or earliest, wrapped instance of `T` by
* the order in which items are seen.
*
* @param get wrapped instance of `T`
*/
case class First[@specialized(Int, Long, Float, Double) +T](get: T) {
/**
* Returns this instance, always.
*
* @param r ignored instance of `First[U]`
*/
def +[U >: T](r: First[U]): First[U] = this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could actually make this return First[T], I think.

}

/**
* Provides a set of operations and typeclass instances needed to use
* [[First]] instances.
*/
object First extends FirstInstances {
/**
* Returns an [[Aggregator]] that selects the first instance of `T`
* in the aggregated stream.
*/
def aggregator[T]: FirstAggregator[T] = FirstAggregator()
}

private[algebird] sealed abstract class FirstInstances {
def firstSemigroup[T] = new Semigroup[T] {
def plus(l: T, r: T): T = l
/**
* Returns a [[Semigroup]] instance with a `plus` implementation
* that always returns the first (ie, the left) `T` argument.
*
* This semigroup's `sumOption` is efficient; it only selects the
* head of the `TraversableOnce` instance, leaving the rest
* untouched.
*/
def firstSemigroup[T]: Semigroup[T] =
new Semigroup[T] {
def plus(l: T, r: T): T = l

override def sumOption(iter: TraversableOnce[T]): Option[T] =
if (iter.isEmpty) None else Some(iter.toIterator.next)
}
override def sumOption(iter: TraversableOnce[T]): Option[T] =
if (iter.isEmpty) None else Some(iter.toIterator.next)
}

/**
* Returns a [[Semigroup]] instance for [[First]][T]. The `plus`
* implementation always returns the first (ie, the left) `First[T]`
* argument.
*/
implicit def semigroup[T]: Semigroup[First[T]] = firstSemigroup[First[T]]
}

/**
* [[Aggregator]] that selects the first instance of `T` in the
* aggregated stream.
*/
case class FirstAggregator[T]() extends Aggregator[T, T, T] {
def prepare(v: T) = v

val semigroup: Semigroup[T] = First.firstSemigroup[T]

def present(v: T) = v
}
38 changes: 34 additions & 4 deletions algebird-core/src/main/scala/com/twitter/algebird/Last.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,53 @@ limitations under the License.
package com.twitter.algebird

/**
* Last tracks the "most recent" item by the order in which items are
* seen.
* Tracks the "most recent", or last, wrapped instance of `T` by the
* order in which items are seen.
*
* @param get wrapped instance of `T`
*/
case class Last[@specialized(Int, Long, Float, Double) +T](get: T) {
/**
* Returns the argument `r`, always.
*
* @param r returned of `Last[U]`
*/
def +[U >: T](r: Last[U]): Last[U] = r
}

/**
* Provides a set of operations and typeclass instances needed to use
* [[Last]] instances.
*/
object Last extends LastInstances {
/**
* Returns an [[Aggregator]] that selects the last instance of `T`
* in the aggregated stream.
*/
def aggregator[T]: LastAggregator[T] = LastAggregator()
}

private[algebird] sealed abstract class LastInstances {
implicit def semigroup[T]: Semigroup[Last[T]] = Semigroup.from { (l, r) => r }
/**
* Returns a [[Semigroup]] instance with a `plus` implementation
* that always returns the last (ie, the right) `T` argument.
*/
def lastSemigroup[T]: Semigroup[T] = Semigroup.from { (l, r) => r }

/**
* Returns a [[Semigroup]] instance for [[Last]][T]. The `plus`
* implementation always returns the last (ie, the right) `Last[T]`
* argument.
*/
implicit def semigroup[T]: Semigroup[Last[T]] = lastSemigroup[Last[T]]
}

/**
* [[Aggregator]] that selects the last instance of `T` in the
* aggregated stream.
*/
case class LastAggregator[T]() extends Aggregator[T, T, T] {
def prepare(v: T) = v
val semigroup: Semigroup[T] = Semigroup.from { (l: T, r: T) => r }
val semigroup: Semigroup[T] = Last.lastSemigroup[T]
def present(v: T) = v
}
Loading