Skip to content

Commit 968ac15

Browse files
committed
Merge pull request #296 from twitter/feature/migrateAsyncSummersFromSB
Feature/migrate async summers from sb
2 parents 48fd31b + 071058e commit 968ac15

13 files changed

+643
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
Copyright 2012 Twitter, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.twitter.algebird.util.summer
17+
18+
import com.twitter.algebird._
19+
import com.twitter.util.{Duration, Future, FuturePool}
20+
import java.util.concurrent.ConcurrentHashMap
21+
import java.util.concurrent.atomic.AtomicInteger
22+
import scala.collection.JavaConverters._
23+
import scala.collection.mutable.{Set => MSet}
24+
/**
25+
* @author Ian O Connell
26+
*/
27+
28+
class AsyncListSum[Key, Value](bufferSize: BufferSize,
29+
override val flushFrequency: FlushFrequency,
30+
override val softMemoryFlush: MemoryFlushPercent,
31+
workPool: FuturePool)
32+
(implicit semigroup: Semigroup[Value])
33+
extends AsyncSummer[(Key, Value), Map[Key, Value]]
34+
with WithFlushConditions[(Key, Value), Map[Key, Value]] {
35+
36+
require(bufferSize.v > 0, "Use the Null summer for an empty async summer")
37+
38+
protected override val emptyResult = Map.empty[Key, Value]
39+
40+
private[this] final val queueMap = new ConcurrentHashMap[Key, IndexedSeq[Value]]()
41+
private[this] final val elementsInCache = new AtomicInteger(0)
42+
43+
override def isFlushed: Boolean = elementsInCache.get == 0
44+
45+
override def flush: Future[Map[Key, Value]] =
46+
workPool {
47+
didFlush // bumps timeout on the flush conditions
48+
// Take a copy of the keyset into a scala set (toSet forces the copy)
49+
// We want this to be safe around uniqueness in the keys coming out of the keys.flatMap
50+
val keys = queueMap.keySet.asScala.toSet
51+
keys.flatMap { k =>
52+
val retV = queueMap.remove(k)
53+
if(retV != null) {
54+
val newRemaining = elementsInCache.addAndGet(retV.size * -1)
55+
Semigroup.sumOption(retV).map(v => (k, v))
56+
}
57+
else None
58+
}.toMap
59+
}
60+
61+
@annotation.tailrec
62+
private[this] final def doInsert(key: Key, vals: IndexedSeq[Value]) {
63+
require(key != null, "Key can not be null")
64+
val success = if (queueMap.containsKey(key)) {
65+
val oldValue = queueMap.get(key)
66+
if(oldValue != null) {
67+
val newValue = vals ++ oldValue
68+
queueMap.replace(key, oldValue, newValue)
69+
} else {
70+
false // Removed between the check above and fetching
71+
}
72+
} else {
73+
// Test if something else has raced into our spot.
74+
queueMap.putIfAbsent(key, vals) == null
75+
}
76+
77+
if(success) {
78+
// Successful insert
79+
elementsInCache.addAndGet(vals.size)
80+
} else {
81+
return doInsert(key, vals)
82+
}
83+
}
84+
85+
def addAll(vals: TraversableOnce[(Key, Value)]): Future[Map[Key, Value]] = {
86+
val prepVals = vals.map { case (k, v) =>
87+
require(k != null, "Inserting a null key?")
88+
(k -> IndexedSeq(v))
89+
} : TraversableOnce[(Key, IndexedSeq[Value])]
90+
91+
val curData = MapAlgebra.sumByKey(prepVals)
92+
93+
curData.foreach { case (k, v) =>
94+
doInsert(k, v)
95+
}
96+
97+
if(elementsInCache.get >= bufferSize.v) {
98+
flush
99+
} else {
100+
Future.value(Map.empty)
101+
}
102+
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Copyright 2012 Twitter, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.twitter.algebird.util.summer
17+
18+
import com.twitter.algebird._
19+
import com.twitter.util.{Duration, Future, FuturePool}
20+
import java.util.concurrent.ArrayBlockingQueue
21+
import scala.collection.mutable.ListBuffer
22+
import scala.collection.JavaConverters._
23+
24+
/**
25+
* @author Ian O Connell
26+
*/
27+
28+
class AsyncMapSum[Key, Value](bufferSize: BufferSize,
29+
override val flushFrequency: FlushFrequency,
30+
override val softMemoryFlush: MemoryFlushPercent,
31+
workPool: FuturePool)
32+
(implicit semigroup: Semigroup[Value])
33+
extends AsyncSummer[(Key, Value), Map[Key, Value]]
34+
with WithFlushConditions[(Key, Value), Map[Key, Value]] {
35+
36+
require(bufferSize.v > 0, "Use the Null summer for an empty async summer")
37+
38+
protected override val emptyResult = Map.empty[Key, Value]
39+
40+
private[this] final val queue = new ArrayBlockingQueue[Map[Key, Value]](bufferSize.v, true)
41+
override def isFlushed: Boolean = queue.size == 0
42+
43+
override def flush: Future[Map[Key, Value]] = {
44+
didFlush // bumps timeout on the flush conditions
45+
val toSum = ListBuffer[Map[Key, Value]]()
46+
queue.drainTo(toSum.asJava)
47+
workPool {
48+
Semigroup.sumOption(toSum).getOrElse(Map.empty)
49+
}
50+
}
51+
52+
def addAll(vals: TraversableOnce[(Key, Value)]): Future[Map[Key, Value]] = {
53+
val curData = Semigroup.sumOption(vals.map(Map(_))).getOrElse(Map.empty)
54+
if(!queue.offer(curData)) {
55+
flush.map { flushRes =>
56+
Semigroup.plus(flushRes, curData)
57+
}
58+
}
59+
else {
60+
Future.value(Map.empty)
61+
}
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
Copyright 2012 Twitter, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.twitter.algebird.util.summer
17+
18+
import com.twitter.algebird._
19+
import com.twitter.util.{Duration, Future}
20+
21+
/**
22+
* @author Ian O Connell
23+
*/
24+
25+
26+
trait AsyncSummer[T, M <: Iterable[T]] { self =>
27+
def flush: Future[M]
28+
def tick: Future[M]
29+
def add(t: T) = addAll(Iterator(t))
30+
def addAll(vals: TraversableOnce[T]): Future[M]
31+
32+
def isFlushed: Boolean
33+
def cleanup: Future[Unit] = Future.Unit
34+
def withCleanup(cleanup: () => Future[Unit]): AsyncSummer[T, M] = {
35+
val oldSelf = self
36+
new AsyncSummerProxy[T, M] {
37+
override val self = oldSelf
38+
override def cleanup = {
39+
oldSelf.cleanup.flatMap { _ => cleanup }
40+
}
41+
}
42+
}
43+
}
44+
45+
trait AsyncSummerProxy[T, M <: Iterable[T]] extends AsyncSummer[T, M] {
46+
def self: AsyncSummer[T, M]
47+
def flush = self.flush
48+
def tick = self.tick
49+
override def add(t: T) = self.add(t)
50+
def addAll(vals: TraversableOnce[T]) = self.addAll(vals)
51+
def isFlushed = self.isFlushed
52+
override def cleanup: Future[Unit] = self.cleanup
53+
}
54+
55+
56+
private[summer] trait WithFlushConditions[T, M <: Iterable[T]] extends AsyncSummer[T, M] {
57+
protected var lastDump:Long = System.currentTimeMillis
58+
protected def softMemoryFlush: MemoryFlushPercent
59+
protected def flushFrequency: FlushFrequency
60+
protected def emptyResult: M
61+
62+
protected def timedOut = (System.currentTimeMillis - lastDump) >= flushFrequency.v.inMilliseconds
63+
protected lazy val runtime = Runtime.getRuntime
64+
65+
protected def didFlush {lastDump = System.currentTimeMillis}
66+
67+
protected def memoryWaterMark = {
68+
val used = ((runtime.totalMemory - runtime.freeMemory).toDouble * 100) / runtime.maxMemory
69+
used > softMemoryFlush.v
70+
}
71+
72+
def tick: Future[M] = {
73+
if (timedOut || memoryWaterMark) {
74+
flush
75+
}
76+
else {
77+
Future.value(emptyResult)
78+
}
79+
}
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
Copyright 2012 Twitter, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.twitter.algebird.util.summer
17+
18+
import com.twitter.util.Duration
19+
20+
/**
21+
* @author Ian O Connell
22+
*/
23+
24+
case class BufferSize(v: Int)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
Copyright 2012 Twitter, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.twitter.algebird.util.summer
17+
18+
import com.twitter.util.Duration
19+
20+
/**
21+
* @author Ian O Connell
22+
*/
23+
24+
case class FlushFrequency(v: Duration)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
Copyright 2012 Twitter, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.twitter.algebird.util.summer
17+
18+
19+
/**
20+
* @author Ian O Connell
21+
*/
22+
23+
case class MemoryFlushPercent(v: Float)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright 2012 Twitter, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.twitter.algebird.util.summer
17+
18+
import com.twitter.algebird._
19+
import com.twitter.util.Future
20+
21+
/**
22+
* @author Ian O Connell
23+
*/
24+
25+
class NullSummer[Key, Value](implicit semigroup: Semigroup[Value])
26+
extends AsyncSummer[(Key, Value), Map[Key, Value]] {
27+
def flush: Future[Map[Key, Value]] = Future.value(Map.empty)
28+
def tick: Future[Map[Key, Value]] = Future.value(Map.empty)
29+
def addAll(vals: TraversableOnce[(Key, Value)]): Future[Map[Key, Value]] =
30+
Future.value(Semigroup.sumOption(vals.map(Map(_))).getOrElse(Map.empty))
31+
override val isFlushed: Boolean = true
32+
}
33+

0 commit comments

Comments
 (0)