Skip to content

Implement StreamSummary data structure for finding frequent/top-k items #259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 26, 2014
Merged

Implement StreamSummary data structure for finding frequent/top-k items #259

merged 5 commits into from
Feb 26, 2014

Conversation

koertkuipers
Copy link
Contributor

No description provided.


// merge two stream summaries
// defer the creation of buckets since more pairwise merges might be necessary and buckets are not used for those
private def merge(x: SSMany[T]): SSMany[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that this merge operation is not associative. I'm not sure how much that will come up in practice, but as a pathological case: say you had a summary for each day, and each day's top m items were unique to it, but the (m+1)th most frequent item was the same for each one. That item should be the most frequent overall, but you'll never find that out by merging daily summaries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. let me go over the merge again and make sure its right

On Mon, Feb 10, 2014 at 6:31 PM, avibryant [email protected] wrote:

In algebird-core/src/main/scala/com/twitter/algebird/StreamSummary.scala:

  • val buckets1 = buckets + (count -> (buckets.getOrElse(count, Set()) + item))
  • new SSMany(m, counters1, buckets1)
  • }
  • // add a single element
  • private[algebird] def add(x: SSOne[T]): SSMany[T] = {
  • require(x.m == m)
  • if (counters.contains(x.item))
  •  bump(x.item)
    
  • else
  •  (if (exact) this else this.loseOne).introduce(x.item, min + 1L, min)
    
  • }
  • // merge two stream summaries
  • // defer the creation of buckets since more pairwise merges might be necessary and buckets are not used for those
  • private def merge(x: SSMany[T]): SSMany[T] = {

I'm worried that this merge operation is not associative. I'm not sure how
much that will come up in practice, but as a pathological case: say you had
a summary for each day, and each day's top m items were unique to it, but
the (m+1)th most frequent item was the same for each one. That item should
be the most frequent overall, but you'll never find that out by merging
daily summaries.

Reply to this email directly or view it on GitHubhttps://github.com//pull/259/files#r9607649
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will check, but i believe that

  1. you are correct that you would never find out its the most frequent
    overall
  2. however the error-bars on the most frequent items that it did find would
    reflect that uncertainty. basically the data structure would admit it was
    not able to say much useful at all.

On Mon, Feb 10, 2014 at 6:31 PM, avibryant [email protected] wrote:

In algebird-core/src/main/scala/com/twitter/algebird/StreamSummary.scala:

  • val buckets1 = buckets + (count -> (buckets.getOrElse(count, Set()) + item))
  • new SSMany(m, counters1, buckets1)
  • }
  • // add a single element
  • private[algebird] def add(x: SSOne[T]): SSMany[T] = {
  • require(x.m == m)
  • if (counters.contains(x.item))
  •  bump(x.item)
    
  • else
  •  (if (exact) this else this.loseOne).introduce(x.item, min + 1L, min)
    
  • }
  • // merge two stream summaries
  • // defer the creation of buckets since more pairwise merges might be necessary and buckets are not used for those
  • private def merge(x: SSMany[T]): SSMany[T] = {

I'm worried that this merge operation is not associative. I'm not sure how
much that will come up in practice, but as a pathological case: say you had
a summary for each day, and each day's top m items were unique to it, but
the (m+1)th most frequent item was the same for each one. That item should
be the most frequent overall, but you'll never find that out by merging
daily summaries.

Reply to this email directly or view it on GitHubhttps://github.com//pull/259/files#r9607649
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's interesting, and not intuitively obvious to me. Would love to see a demonstration of this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok here is an attempt at demonstrating it:

to illustrate, say you have 2 streamsummaries (so parallelism is 2).

errors are reported as possible overcounting. so count of 30 and error of
10 means the true count was in range [20, 30]

the most frequent item in on streamsummary 1 is coffee with count 40 and
error 0, and the minimum count is 30.
the most frequent item in on streamsummary 2 is tea with count 38 and error
0, and the minimum count is 29.

assume the true most frequent item is hidden in those minimum counts, and
it is not present in either datastructure. it could have appeared up to 30

  • 29 times.
    after a merge it would report coffee with a count of 40 + 29 and an error
    of 29, and tea with a count of 38 + 30 and an error of 30.

any unknown item (not kept in either streamsummary) would report count of
30 + 29 with error of 30 + 29

so you get these ranges:
coffee [40, 69]
tea [38, 68]
unknown [0, 59]

this shows that coffee can be replaced by any unknow argument as the most
frequent item. the data structure this way admits it doesnt know.

On Mon, Feb 10, 2014 at 7:20 PM, avibryant [email protected] wrote:

In algebird-core/src/main/scala/com/twitter/algebird/StreamSummary.scala:

  • val buckets1 = buckets + (count -> (buckets.getOrElse(count, Set()) + item))
  • new SSMany(m, counters1, buckets1)
  • }
  • // add a single element
  • private[algebird] def add(x: SSOne[T]): SSMany[T] = {
  • require(x.m == m)
  • if (counters.contains(x.item))
  •  bump(x.item)
    
  • else
  •  (if (exact) this else this.loseOne).introduce(x.item, min + 1L, min)
    
  • }
  • // merge two stream summaries
  • // defer the creation of buckets since more pairwise merges might be necessary and buckets are not used for those
  • private def merge(x: SSMany[T]): SSMany[T] = {

Ah, that's interesting, and not intuitively obvious to me. Would love to
see a demonstration of this.

Reply to this email directly or view it on GitHubhttps://github.com//pull/259/files#r9609124
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that, and this is the tricky part, the merge does not keep all items.
it truncates to keep the map size the same (otherwise a merge could result
in thing getting out of hand in terms of size). what this means -i believe-
is that after the merge the bottom element in the structure might have a
count higher than 30 + 29, and the result would be that for an unknow item
it would report count x with error x for some x > (30 + 29). so there is a
loss in precision, but it is still correct.

On Mon, Feb 10, 2014 at 7:44 PM, Koert Kuipers [email protected] wrote:

ok here is an attempt at demonstrating it:

to illustrate, say you have 2 streamsummaries (so parallelism is 2).

errors are reported as possible overcounting. so count of 30 and error of
10 means the true count was in range [20, 30]

the most frequent item in on streamsummary 1 is coffee with count 40 and
error 0, and the minimum count is 30.
the most frequent item in on streamsummary 2 is tea with count 38 and
error 0, and the minimum count is 29.

assume the true most frequent item is hidden in those minimum counts, and
it is not present in either datastructure. it could have appeared up to 30

  • 29 times.
    after a merge it would report coffee with a count of 40 + 29 and an error
    of 29, and tea with a count of 38 + 30 and an error of 30.

any unknown item (not kept in either streamsummary) would report count of
30 + 29 with error of 30 + 29

so you get these ranges:
coffee [40, 69]
tea [38, 68]
unknown [0, 59]

this shows that coffee can be replaced by any unknow argument as the most
frequent item. the data structure this way admits it doesnt know.

On Mon, Feb 10, 2014 at 7:20 PM, avibryant [email protected]:

In algebird-core/src/main/scala/com/twitter/algebird/StreamSummary.scala:

  • val buckets1 = buckets + (count -> (buckets.getOrElse(count, Set()) + item))
  • new SSMany(m, counters1, buckets1)
  • }
  • // add a single element
  • private[algebird] def add(x: SSOne[T]): SSMany[T] = {
  • require(x.m == m)
  • if (counters.contains(x.item))
  •  bump(x.item)
    
  • else
  •  (if (exact) this else this.loseOne).introduce(x.item, min + 1L, min)
    
  • }
  • // merge two stream summaries
  • // defer the creation of buckets since more pairwise merges might be necessary and buckets are not used for those
  • private def merge(x: SSMany[T]): SSMany[T] = {

Ah, that's interesting, and not intuitively obvious to me. Would love to
see a demonstration of this.

Reply to this email directly or view it on GitHubhttps://github.com//pull/259/files#r9609124
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your first note made sense, and I can't find a flaw in the argument (and, cool! The addition of per-element error is a neat innovation here). I don't follow the second part yet, can you elaborate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the merge combines the 2 counters maps. but notice the result is truncated
after sorting:
.sorted(StreamSummary.ordering)
.take(capacity)

to me the hardest part was convincing myself the resulting structure after
the sort and take was correct. it is easy to convince yourself it is
correct for items that are in either map, or in both.

for items that are in neither the newly reported count (and error, which
will be equal to the count) after the merge of the 2 maps, but before the
take operation, will be the maximum of:

  • lowest count on left plus min on right (if item with lowest count of left
    was not present on right), so left.min + right.min
  • lowest count on left plus actual count on right (if item with lowest
    count on left was present on right). if item is present on right it must
    have count >= right.min, and therefore result is >= left.min + right.min
  • lowest count on right plus min on left (if item with lowest count on
    right was not present on left), which is equal to left.min + right.min
  • lowest count on right plus actual count on left (if item with lowest
    count on right was present on left). if item is present on left it must
    have count >= left.min, and therefore result is >= left.min + right.min

so in all cases the resulting lowest_count >= left.min + right.min.

after truncating with the take operation the number can only go up (since
the items are sorted by count desc). since lowest_count becomes min on the
merged object we now have merge.min >= left.min + right.min, which means
for unknown items the domain will be [0, merge.min] for some merge.min >=
left.min + right.min

i believe this is always correct, and the lowest bound possible given the
need to truncate (take operation) to maintain structure sizing...

reading it again its a bit confusing. not sure if this helps...

On Tue, Feb 11, 2014 at 12:18 PM, avibryant [email protected]:

In algebird-core/src/main/scala/com/twitter/algebird/StreamSummary.scala:

  • val buckets1 = buckets + (count -> (buckets.getOrElse(count, Set()) + item))
  • new SSMany(m, counters1, buckets1)
  • }
  • // add a single element
  • private[algebird] def add(x: SSOne[T]): SSMany[T] = {
  • require(x.m == m)
  • if (counters.contains(x.item))
  •  bump(x.item)
    
  • else
  •  (if (exact) this else this.loseOne).introduce(x.item, min + 1L, min)
    
  • }
  • // merge two stream summaries
  • // defer the creation of buckets since more pairwise merges might be necessary and buckets are not used for those
  • private def merge(x: SSMany[T]): SSMany[T] = {

Your first note made sense, and I can't find a flaw in the argument (and,
cool! The addition of per-element error is a neat innovation here). I don't
follow the second part yet, can you elaborate?

Reply to this email directly or view it on GitHubhttps://github.com//pull/259/files#r9632899
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i should have simply said:

after merging the 2 maps but before the take operation, the new minimum
count is going to be the sum of the minimum counts on left and right, or
possibly higher (if all the items that had minimum counts on left had
higher counts on right and vice versa).

truncating after a sort descending by count can only raise this minimum
count (a reflection of information loss due to the truncation). so the new
minimum count is guaranteed to be higher than the sum of the minimum counts
on both sides.

since merge.min >= left.min + right.min, for an unknown item we had before
that it's count on left would be in range [0, left.min] and on right in
range [0, right.min], which summed up leads to range [0, left.min +
right.min]. now its [0, merge.min] which is guaranteed to to encompass the
old estimate (and possibly be less precise).

On Tue, Feb 11, 2014 at 12:54 PM, Koert Kuipers [email protected] wrote:

the merge combines the 2 counters maps. but notice the result is truncated
after sorting:
.sorted(StreamSummary.ordering)
.take(capacity)

to me the hardest part was convincing myself the resulting structure after
the sort and take was correct. it is easy to convince yourself it is
correct for items that are in either map, or in both.

for items that are in neither the newly reported count (and error, which
will be equal to the count) after the merge of the 2 maps, but before the
take operation, will be the maximum of:

  • lowest count on left plus min on right (if item with lowest count of
    left was not present on right), so left.min + right.min
  • lowest count on left plus actual count on right (if item with lowest
    count on left was present on right). if item is present on right it must
    have count >= right.min, and therefore result is >= left.min + right.min
  • lowest count on right plus min on left (if item with lowest count on
    right was not present on left), which is equal to left.min + right.min
  • lowest count on right plus actual count on left (if item with lowest
    count on right was present on left). if item is present on left it must
    have count >= left.min, and therefore result is >= left.min + right.min

so in all cases the resulting lowest_count >= left.min + right.min.

after truncating with the take operation the number can only go up (since
the items are sorted by count desc). since lowest_count becomes min on the
merged object we now have merge.min >= left.min + right.min, which means
for unknown items the domain will be [0, merge.min] for some merge.min >=
left.min + right.min

i believe this is always correct, and the lowest bound possible given the
need to truncate (take operation) to maintain structure sizing...

reading it again its a bit confusing. not sure if this helps...

On Tue, Feb 11, 2014 at 12:18 PM, avibryant [email protected]:

In algebird-core/src/main/scala/com/twitter/algebird/StreamSummary.scala:

  • val buckets1 = buckets + (count -> (buckets.getOrElse(count, Set()) + item))
  • new SSMany(m, counters1, buckets1)
  • }
  • // add a single element
  • private[algebird] def add(x: SSOne[T]): SSMany[T] = {
  • require(x.m == m)
  • if (counters.contains(x.item))
  •  bump(x.item)
    
  • else
  •  (if (exact) this else this.loseOne).introduce(x.item, min + 1L, min)
    
  • }
  • // merge two stream summaries
  • // defer the creation of buckets since more pairwise merges might be necessary and buckets are not used for those
  • private def merge(x: SSMany[T]): SSMany[T] = {

Your first note made sense, and I can't find a flaw in the argument (and,
cool! The addition of per-element error is a neat innovation here). I don't
follow the second part yet, can you elaborate?

Reply to this email directly or view it on GitHubhttps://github.com//pull/259/files#r9632899
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks.

@avibryant
Copy link
Contributor

I'm strongly in favor of having something like this in here, but see my worries about associativity. (Note that with SketchMap, we basically just ignored this concern, but I think it's somewhat more likely to be a problem in practice with this).

@@ -225,4 +225,5 @@ object Monoid extends GeneratedMonoidImplicits with ProductMonoids {
implicit def jmapMonoid[K,V : Semigroup] = new JMapMonoid[K,V]
implicit def eitherMonoid[L : Semigroup, R : Monoid] = new EitherMonoid[L, R]
implicit def function1Monoid[T] = new Function1Monoid[T]
implicit def streamSummaryMonoid[T]: Monoid[StreamSummary[T]] = new StreamSummaryMonoid[T]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not go here. Only objects we don't control go here. Otherwise, put it in the companion object for StreamSummary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i misunderstood that one. will put it in companion.

On Mon, Feb 10, 2014 at 6:51 PM, P. Oscar Boykin
[email protected]:

In algebird-core/src/main/scala/com/twitter/algebird/Monoid.scala:

@@ -225,4 +225,5 @@ object Monoid extends GeneratedMonoidImplicits with ProductMonoids {
implicit def jmapMonoid[K,V : Semigroup] = new JMapMonoid[K,V]
implicit def eitherMonoid[L : Semigroup, R : Monoid] = new EitherMonoid[L, R]
implicit def function1Monoid[T] = new Function1Monoid[T]

  • implicit def streamSummaryMonoid[T]: Monoid[StreamSummary[T]] = new StreamSummaryMonoid[T]

this should not go here. Only objects we don't control go here. Otherwise,
put it in the companion object for StreamSummary.

Reply to this email directly or view it on GitHubhttps://github.com//pull/259/files#r9608332
.

@avibryant
Copy link
Contributor

Can we pick a different name than StreamSummary? Should we just call it SpaceSaver, since it's very close to that?

Also, I feel like we need a term for things like this or QTree which are not strictly speaking associative, but obey the weaker constraint that they can be merged without sacrificing the correctness of their estimates (just possibly losing precision) - vs on the one hand HyperLogLog, which is perfectly associative, or on the other hand SketchMap, which can simply be incorrect about the heavy hitters.

@koertkuipers
Copy link
Contributor Author

SpaceSaver sounds good and is less generic that StreamSummary. i will make
that change

On Tue, Feb 11, 2014 at 2:28 PM, avibryant [email protected] wrote:

Can we pick a different name than StreamSummary? Should we just call it
SpaceSaver, since it's very close to that?

Also, I feel like we need a term for things like this or QTree which are
not strictly speaking associative, but obey the weaker constraint that they
can be merged without sacrificing the correctness of their estimates (just
possibly losing precision) - vs on the one hand HyperLogLog, which is
perfectly associative, or on the other hand SketchMap, which can simply be
incorrect about the heavy hitters.

Reply to this email directly or view it on GitHubhttps://github.com//pull/259#issuecomment-34795324
.

@avibryant
Copy link
Contributor

We should get this merged; @johnynek any remaining blockers?

@avibryant avibryant mentioned this pull request Feb 20, 2014
/**
* Maximum number of counters to keep (parameter "m" in the research paper).
*/
def capacity: Int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this not an Option[Long]?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than make this an Option, why not just add it directly to SSOne and SSMany? Might be cleaner that way rather than jam it onto zero.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making this a semigroup? Those can be made monoids by wrapping them in Option. Isn't that what is happening here (you just have a named type)?

Just a thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will make it semigroup and remove SSZero

@johnynek
Copy link
Collaborator

Sorry for the delay in reviewing this.

  1. We do need to name structures like op: (T, T) => T with the property that op(a, op(b, c)) ~ op(op(a, b), c). Something like effective associative, or nearly associative. NearSemigroup? NearMonoid? It is perfectly sensible, but we could get lost in the definition of when something is close enough, or what do we mean by this (if we randomly choose a triple what is the probability they are associative? Or maybe: Metric( (a + b) + c, a + (b + c) ) < eps for some metric and eps?

  2. Should we make a new package now? algebird.almost?

…ior, secondary constructors, and beefing up unit test to include SSMany instances for semigroup laws
@koertkuipers
Copy link
Contributor Author

mhhh i got one test failure:
[info] ! Collections.IndexedSeq is a pseudoRing: Falsified after 6 passed tests.

not sure what that is about

@johnynek
Copy link
Collaborator

That was a real bug. Added an issue, restarted CI.

* Construct SpaceSaver with given capacity containing a single item.
*/
def apply[T](capacity: Int, item: T): SpaceSaver[T] = SSOne(capacity, item)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we should not add:

implicit def semigroup[T]: Semigroup[SpaceSaver[T]] = new SpaceSaverSemigroup[T]

and remove the explicit setup in the test?

@johnynek
Copy link
Collaborator

If you add the implicit def in the object, I will merge.

Thanks for working on this.

johnynek added a commit that referenced this pull request Feb 26, 2014
Implement StreamSummary data structure for finding frequent/top-k items
@johnynek johnynek merged commit 56f579e into twitter:develop Feb 26, 2014
@jaredwinick
Copy link

If the code was modified such that SSOne looked like

case class SSOne[T](capacity: Int, item: Option[T]) extends SpaceSaver[T] 

and this change was carried through, we could create a SpaceSaverMonoid, right? It seems like that might work but I am still trying to figure things out. Thanks for any guidance.

@koertkuipers
Copy link
Contributor Author

you would need a zero element for the monoid. so create an SSZero object.

alternatively you can wrap SpaceSaver in an Option, so the None becomes
your zero. see OptionMonoid in Algebird to create the monoid from the
semigroup. something like this:

def spaceSaverMonoid: Monoid[Option[SpaceSaver]] == new
OptionMonoid(SpaceSaverSemigroup)

On Wed, Mar 19, 2014 at 2:59 PM, Jared Winick [email protected]:

If the code was modified such that SSOne looked like

case class SSOne[T](capacity: Int, item: Option[T]) extends SpaceSaver[T]

and this change was carried through, we could create a SpaceSaverMonoid,
right? It seems like that might work but I am still trying to figure things
out. Thanks for any guidance.

Reply to this email directly or view it on GitHubhttps://github.com//pull/259#issuecomment-38092649
.

@jaredwinick
Copy link

great, thanks for the pointers, I appreciate it!

On Wed, Mar 19, 2014 at 1:11 PM, koertkuipers [email protected]:

you would need a zero element for the monoid. so create an SSZero object.

alternatively you can wrap SpaceSaver in an Option, so the None becomes
your zero. see OptionMonoid in Algebird to create the monoid from the
semigroup. something like this:

def spaceSaverMonoid: Monoid[Option[SpaceSaver]] == new
OptionMonoid(SpaceSaverSemigroup)

On Wed, Mar 19, 2014 at 2:59 PM, Jared Winick <[email protected]

wrote:

If the code was modified such that SSOne looked like

case class SSOne[T](capacity: Int, item: Option[T]) extends SpaceSaver[T]

and this change was carried through, we could create a SpaceSaverMonoid,
right? It seems like that might work but I am still trying to figure
things
out. Thanks for any guidance.

Reply to this email directly or view it on GitHub<
https://github.com/twitter/algebird/pull/259#issuecomment-38092649>
.

Reply to this email directly or view it on GitHubhttps://github.com//pull/259#issuecomment-38094066
.

Jared Winick
Software Engineer
Koverse, Inc.

[email protected]
855-403-1399 x704
http://www.koverse.com

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants