Skip to content

Optimize the storage backend used in sketch map #301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 15, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package com.twitter.algebird

import scala.collection.breakOut
import com.twitter.algebird.matrix.AdaptiveMatrix

/**
* A Sketch Map is a generalized version of the Count-Min Sketch that is an
Expand Down Expand Up @@ -159,8 +160,8 @@ case class SketchMapParams[K](seed: Int, width: Int, depth: Int, heavyHittersCou
def frequency[V:Ordering](key: K, table: AdaptiveMatrix[V]): V =
hashes
.view
.zip(table.rowsByColumns)
.map { case (hash, row) => row(hash(key)) }
.zipWithIndex
.map { case (hash, row) => table.getValue(row, hash(key)) }
.min

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2012 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.algebird.matrix

import scala.collection.mutable.{ArrayBuffer, Map => MMap}
import com.twitter.algebird.{AdaptiveVector, Monoid}

/**
* A Matrix structure that is designed to hide moving between sparse and dense representations
* Initial support here is focused on a dense row count with a sparse set of columns
*/

abstract class AdaptiveMatrix[V: Monoid] {
def rows: Int
def cols: Int
def size = rows * cols

def getValue(position: (Int, Int)): V

def updateInto(buffer: ArrayBuffer[V]): Unit

def updated(position: (Int, Int), value: V): AdaptiveMatrix[V]
}

object AdaptiveMatrix {
def zero[V: Monoid](rows: Int, cols: Int) = fill(rows, cols)(implicitly[Monoid[V]].zero)

def fill[V: Monoid](rows: Int, cols: Int)(fill: V): AdaptiveMatrix[V] = {
SparseColumnMatrix(Vector.fill(rows)(AdaptiveVector.fill[V](cols)(fill)))
}

def empty[V: Monoid](): AdaptiveMatrix[V] = {
SparseColumnMatrix(IndexedSeq[AdaptiveVector[V]]())
}

// The adaptive monoid to swap between sparse modes.
implicit def monoid[V:Monoid]: Monoid[AdaptiveMatrix[V]] = new Monoid[AdaptiveMatrix[V]] {
private[this] final val innerZero = implicitly[Monoid[V]].zero
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason you generally prefer the implicitly[Monoid... approach vs. Monoid.zero[V], and so on?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly not really, this ensures its just a field access for the zero constant in here if we hit it often rather than potentially being a def/virtual call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How smart is implicitly? I always thought it was just something like

def implicitly[T](implicit t: T) = t

Which would mean that there is still a virtual call. Is the scala compiler actually doing something smarter here, like making a direct reference to the implicit type that is in scope?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but i assign it to a final val, that is then used in any loops/accesses within the monoid. So for those it should be a constant. I could have used Monoid.zero[T] in the constant here too. Sorry I think I might have answered a different question than asked. both put into this line I presume both are just equiv

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, yeah, I think we diverged, but I am with you.


override def zero: AdaptiveMatrix[V] = SparseColumnMatrix[V](IndexedSeq[AdaptiveVector[V]]())

override def plus(a: AdaptiveMatrix[V], b: AdaptiveMatrix[V]) = sumOption(List(a, b)).get

private def denseInsert(rows: Int, cols: Int, buff: ArrayBuffer[V], remainder: Iterator[AdaptiveMatrix[V]]): Option[AdaptiveMatrix[V]] = {
remainder.foreach(_.updateInto(buff))
Some(DenseMatrix(rows, cols, buff))
}

private def denseUpdate(current: AdaptiveMatrix[V], remainder: Iterator[AdaptiveMatrix[V]]): Option[AdaptiveMatrix[V]] = {
val rows = current.rows
val cols = current.cols
val buffer = ArrayBuffer.fill(rows * cols)(innerZero)
current.updateInto(buffer)
denseInsert(rows, cols, buffer, remainder)
}

private def sparseUpdate(storage: IndexedSeq[MMap[Int, V]], other: SparseColumnMatrix[V]) = {
other.rowsByColumns.zipWithIndex.foreach { case (contents, indx) =>
val curMap: MMap[Int, V] = storage(indx)
AdaptiveVector.toMap(contents).foreach { case (col, value) =>
curMap.update(col, Monoid.plus(value, curMap.getOrElse(col, innerZero)))
}
}
}

private def goDense(rows: Int, cols: Int, storage: IndexedSeq[MMap[Int, V]], remainder: Iterator[AdaptiveMatrix[V]]): Option[AdaptiveMatrix[V]] = {
val buffer = ArrayBuffer.fill(rows * cols)(innerZero)
var row = 0
val iter = storage.iterator
while(iter.hasNext) {
val curRow = iter.next
curRow.foreach { case (col, value) =>
buffer(row*cols + col) = value
}
row += 1
}
denseInsert(rows, cols, buffer, remainder)
}

override def sumOption(items: TraversableOnce[AdaptiveMatrix[V]]): Option[AdaptiveMatrix[V]] =
if(items.isEmpty) {
None
} else {
val iter = items.toIterator.buffered
val rows = iter.head.rows
val cols = iter.head.cols
val sparseStorage = (0 until rows).map{_ => MMap[Int, V]()}.toIndexedSeq

while(iter.hasNext) {
val current = iter.next
current match {
case d@DenseMatrix(_, _, _) => return denseUpdate(d, iter)
case s@SparseColumnMatrix(_) =>
sparseUpdate(sparseStorage, s)
if(sparseStorage(0).size > current.cols/4) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps this logic should be in a method/constant whatever?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well this is the only place it exists in the code I modified, now if we should have some notion for all our sparse/adaptive systems for when to go dense, maybe. Also maybe should be a parameter. Not sure really.

return goDense(rows, cols, sparseStorage, iter)
}
}
}

// Need to still be sparse to reach here, so must unpack the MMap to be used again.
Some(SparseColumnMatrix.fromSeqMap(cols, sparseStorage))
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2012 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.algebird.matrix
import scala.collection.mutable.{ArrayBuffer, Map => MMap}

import com.twitter.algebird.Monoid

case class DenseMatrix[V: Monoid](rows: Int, cols: Int, rowsByColumns: IndexedSeq[V]) extends AdaptiveMatrix[V] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you prefer an IndexedSeq here to an Array[V]? Is it jsut the translation cost into and out of DenseMatrix? It seems like the Array[V] is potentially cheaper (if we specialize)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An array is mutable, closest immutable is the IndexedSeq I believe -- it can be backed by an array however

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes I suppose we are hiding all of the mutable stuff from the user. We can probably get rid of more indirection in the future by exposing that, if it's a win.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aye, though without rust style ownership it's hard to leaverage most of the gains

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you say @johnynek's name 3 times fast, he pops out and starts talking about rust or haskell

val valueMonoid = implicitly[Monoid[V]]

private[this] def tupToIndex(position: (Int, Int)) = position._1 * cols + position._2

override def getValue(position: (Int, Int)): V = rowsByColumns(tupToIndex(position))

override def updated(position: (Int, Int), value: V): DenseMatrix[V] =
DenseMatrix[V](rows, cols, rowsByColumns.updated(tupToIndex(position), value))


override def updateInto(buffer: ArrayBuffer[V]) {
var indx = 0
val lsize = size
while(indx < lsize) {
buffer(indx) = valueMonoid.plus(buffer(indx), rowsByColumns(indx))
indx += 1
}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright 2012 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.algebird.matrix
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
import com.twitter.algebird.{Monoid, AdaptiveVector}

object SparseColumnMatrix {
def fromSeqMap[V: Monoid](cols: Int, data: IndexedSeq[MMap[Int, V]]) = {
val monoidZero = implicitly[Monoid[V]].zero
SparseColumnMatrix(data.map { mm =>
AdaptiveVector.fromMap(mm.toMap, monoidZero, cols)
}.toIndexedSeq)
}
}

case class SparseColumnMatrix[V: Monoid](rowsByColumns: IndexedSeq[AdaptiveVector[V]]) extends AdaptiveMatrix[V] {
/** Row is the outer Seq, the columns are the inner vectors. */

val valueMonoid = implicitly[Monoid[V]]

override def rows: Int = rowsByColumns.size

override def cols: Int = rowsByColumns(0).size

def getValue(position: (Int, Int)): V = rowsByColumns(position._1)(position._2)

def updated(position: (Int, Int), value: V): SparseColumnMatrix[V] = {
val (row, col) = position
SparseColumnMatrix[V](rowsByColumns.updated(row, rowsByColumns(row).updated(col, value)))
}

override def updateInto(buffer: ArrayBuffer[V]) {
val lcols = cols
var row = 0
while(row < rows) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to prefer a while to a 0.upto(rows).foreach type thing? Is there a performance difference? Do you find this while more readable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its faster, no closures/allocations

val iter = rowsByColumns(row).denseIterator
while(iter.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to prefer a while to a foreach here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While is faster, no closures

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this was haskell we wouldn't even be worrying about that ;) gotcha.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alas yeah, it can actually be a big thing for tight loops in scala -- really the compiler should just optimize it. There was a project for an optimizing complier plugin to do it but it appeared dead last I saw

val (col, value) = iter.next
val indx = row * lcols + col
buffer(indx) = valueMonoid.plus(buffer(indx), value)
}
row += 1
}
}

def toDense: DenseMatrix[V] = {
val buf = ArrayBuffer.fill(size)(valueMonoid.zero)
updateInto(buf)
DenseMatrix(rows, cols, buf)
}
}