Skip to content

Add sumOption support for Tuples #242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 3, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2012 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.algebird

import scala.collection.mutable.ArrayBuffer

/** Represents something that consumes I and may emit O. Has some internal
* state that may be used to improve performance.
* Generally used to model folds or reduces (see BufferedReduce)
*/
trait Buffered[I, O] extends java.io.Serializable {
def put(i: I): Option[O]
def flush: Option[O]
def isFlushed: Boolean
}

abstract class ArrayBufferedOperation[I, O](size: Int) extends Buffered[I, O] {
def operate(nonEmpty: Seq[I]): O

require(size > 0, "buffer <= 0 not allowed")

private val buffer = new ArrayBuffer[I](size)

def put(item: I): Option[O] = {
buffer += item
if(buffer.size >= size) flush
else None
}

def flush: Option[O] =
if(buffer.isEmpty) None
else {
val res = operate(buffer)
buffer.clear
Some(res)
}

def isFlushed = buffer.isEmpty
}

/**
* This never emits on put, you must call flush
* designed to be use in the stackable pattern with ArrayBufferedOperation
*/
trait BufferedReduce[V] extends Buffered[V, V] {
abstract override def put(item: V) = {
val res = super.put(item)
// avoiding closures for performance critical code:
if(res.isDefined) put(res.get)
else None
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,13 @@ package com.twitter.algebird
* Law 2: isFlushed == flush.isEmpty
* @author Oscar Boykin
*/
trait StatefulSummer[V] extends java.io.Serializable {
trait StatefulSummer[V] extends Buffered[V, V] {
def semigroup: Semigroup[V]
// possibly emit a partially summed value
def put(item: V): Option[V]
// All state is reset at this point and return the rest of the sum
def flush: Option[V]
def isFlushed: Boolean
}

/** Sum the entire iterator one item at a time. Only emits on flush
*/
* you should probably prefer BufferedSumAll
*/
class SumAll[V](implicit override val semigroup: Semigroup[V]) extends StatefulSummer[V] {
var summed: Option[V] = None
def put(item: V) = {
Expand All @@ -49,3 +45,11 @@ class SumAll[V](implicit override val semigroup: Semigroup[V]) extends StatefulS
}
def isFlushed = summed.isEmpty
}

class BufferedSumAll[V](size: Int)(implicit override val semigroup: Semigroup[V])
extends ArrayBufferedOperation[V, V](size)
with StatefulSummer[V]
with BufferedReduce[V] {

def operate(nonEmpty: Seq[V]): V = semigroup.sumOption(nonEmpty).get
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ object BaseProperties {
def isAssociative[T : Semigroup : Arbitrary] = isAssociativeEq[T](defaultEq _)

def semigroupSumWorks[T:Semigroup:Arbitrary:Equiv] = forAll { (in: List[T]) =>
Equiv[Option[T]].equiv(Semigroup.sumOption(in), in.reduceLeftOption(Semigroup.plus(_,_)))
in.isEmpty || {
Equiv[T].equiv(Semigroup.sumOption(in).get, in.reduceLeft(Semigroup.plus(_,_)))
}
}

def isCommutativeEq[T : Semigroup : Arbitrary](eqfn: (T,T) => Boolean) = forAll { (a:T,b:T)=>
Expand Down Expand Up @@ -89,11 +91,12 @@ object BaseProperties {
}
def validZero[T : Monoid : Arbitrary] = validZeroEq[T](defaultEq _)

def monoidLaws[T : Monoid : Arbitrary] = validZero[T] && isAssociative[T] && isNonZeroWorksMonoid[T]
def monoidLaws[T : Monoid : Arbitrary] = validZero[T] && semigroupLaws[T] && isNonZeroWorksMonoid[T]
def monoidLawsEq[T : Monoid : Arbitrary](eqfn : (T,T) => Boolean) =
validZeroEq[T](eqfn) && isAssociativeEq[T](eqfn)
validZeroEq[T](eqfn) && semigroupLawsEq[T](eqfn)

def commutativeMonoidLawsEq[T : Monoid : Arbitrary](eqfn : (T,T) => Boolean) =
validZeroEq[T](eqfn) && isAssociativeEq[T](eqfn) && isCommutativeEq[T](eqfn)
monoidLawsEq[T](eqfn) && isCommutativeEq[T](eqfn)
def commutativeMonoidLaws[T:Monoid:Arbitrary] = commutativeMonoidLawsEq[T](defaultEq _)

def hasAdditiveInverses[T: Group : Arbitrary] = forAll { (a : T) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object HyperLogLogLaws extends Properties("HyperLogLog") {
) yield (hllMonoid(v))
}

property("HyperLogLog is a Monoid") = monoidLaws[HLL]
property("HyperLogLog is a Monoid") = monoidLawsEq[HLL]{_.toDenseHLL == _.toDenseHLL}
}

class HyperLogLogTest extends Specification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ object SketchMapLaws extends Properties("SketchMap") {
for (key: Int <- choose(0, 10000)) yield (smMonoid.create(key, 1L))
}

property("SketchMap is a Monoid") = monoidLaws[SketchMap[Int, Long]]
// TODO: SketchMap's heavy hitters are not strictly associative (approximately they are)
property("SketchMap is a Monoid") = commutativeMonoidLawsEq[SketchMap[Int, Long]] { (left, right) =>
(left.valuesTable == right.valuesTable) &&
(left.params == right.params) &&
(left.totalValue == right.totalValue)
}
}


Expand Down
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object AlgebirdBuild extends Build {
def youngestForwardCompatible(subProj: String) =
Some(subProj)
.filterNot(unreleasedModules.contains(_))
.map { s => "com.twitter" % ("algebird-" + s + "_2.9.3") % "0.2.0" }
.map { s => "com.twitter" % ("algebird-" + s + "_2.9.3") % "0.3.1" }

lazy val algebird = Project(
id = "algebird",
Expand Down
21 changes: 21 additions & 0 deletions scripts/ntuple_generators.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ def get_operation(n, algebraic_structure, operation)
"override def #{operation}(#{method_params}) = #{values_commaed}"
end

def get_sumoption(n, bufferSize)
# Example: "(T, U)"
individual_element_type = "(#{TYPE_SYMBOLS.first(n).join(", ")})"
# Example: "items : TraversableOnce[(T, U)]"
method_params = "to : TraversableOnce[#{individual_element_type}]"
# Example: "tsemigroup.sumOption(items.iterator.map(_._1), "
values_commaed = TYPE_SYMBOLS.first(n).each_with_index.map do |t, i|
"#{t.downcase}semigroup.sumOption(items.iterator.map(_._#{i+1})).get"
end.join(", ")

"override def sumOption(#{method_params}) = {
val buf = new ArrayBufferedOperation[#{individual_element_type}, #{individual_element_type}](#{bufferSize}) with BufferedReduce[#{individual_element_type}] {
def operate(items: Seq[#{individual_element_type}]) =
(#{values_commaed})
}
to.foreach(buf.put(_))
buf.flush
}"
end

# Example return:
# "implicit def pairMonoid[T,U](implicit tg : Monoid[T], ug : Monoid[U]) : Monoid[(T,U)] = {
# new Tuple2Monoid[T,U]()(tg,ug)
Expand All @@ -134,6 +154,7 @@ def print_class_definitions
#{get_comment(tuple_size, "semigroup")}
#{get_class_definition(tuple_size, "semigroup")} {
#{get_operation(tuple_size, "semigroup", "plus")}
#{get_sumoption(tuple_size, 1000)}
}

#{get_comment(tuple_size, "monoid")}
Expand Down