Skip to content

Commit dbb275b

Browse files
sagelywizardRobert Newson
authored and
Robert Newson
committed
Stop using ElasticBlockingQueue
ElasticBlockingQueue will drop messages when no thread is polling the queue. These changes replace ElasticBlockingQueue with java.util.concurrent.LinkedBlockingQueue.
1 parent e599203 commit dbb275b

File tree

2 files changed

+7
-8
lines changed

2 files changed

+7
-8
lines changed

src/main/scala/scalang/epmd/Epmd.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import java.util.concurrent.Callable
2525

2626
object Epmd {
2727
val defaultPort = 4369
28-
lazy val bossPool = ThreadPool.instrumentedElastic("scalang.epmd", "boss", 1, 20)
29-
lazy val workerPool = ThreadPool.instrumentedElastic("scalang.epmd", "worker", 1, 20)
28+
lazy val bossPool = ThreadPool.instrumentedFixed("scalang.epmd", "boss", 20)
29+
lazy val workerPool = ThreadPool.instrumentedFixed("scalang.epmd", "worker", 20)
3030

3131
def apply(host : String) : Epmd = {
3232
val port = Option(System.getenv("ERL_EPMD_PORT")).map(_.toInt).getOrElse(defaultPort)

src/main/scala/scalang/util/ThreadPoolFactory.scala

+5-6
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ class DefaultThreadPoolFactory extends ThreadPoolFactory {
5858
lazy val workerPool = new OrderedMemoryAwareThreadPoolExecutor(max_threads, 524288000, 524288000, 1000, TimeUnit.SECONDS, new NamedThreadFactory("worker"))
5959
lazy val actorPool = new MemoryAwareThreadPoolExecutor(max_threads, 100, 100, 1000, TimeUnit.SECONDS, StupidObjectSizeEstimator, new NamedThreadFactory("actor"))
6060
**/
61-
lazy val bossPool = ThreadPool.instrumentedElastic("scalang", "boss", 2, max_threads)
62-
lazy val workerPool = ThreadPool.instrumentedElastic("scalang", "worker", 2, max_threads)
61+
lazy val bossPool = ThreadPool.instrumentedFixed("scalang", "boss", max_threads)
62+
lazy val workerPool = ThreadPool.instrumentedFixed("scalang", "worker", max_threads)
6363
lazy val executorPool = new OrderedMemoryAwareThreadPoolExecutor(max_threads, max_memory, max_memory, 1000, TimeUnit.SECONDS, new NamedThreadFactory("executor"))
64-
lazy val actorPool = ThreadPool.instrumentedElastic("scalang", "actor", 2, max_threads)
64+
lazy val actorPool = ThreadPool.instrumentedFixed("scalang", "actor", max_threads)
6565
lazy val batchExecutor = new BatchExecutorImpl
6666

6767
val poolNameCounter = new AtomicInteger(0)
@@ -84,9 +84,8 @@ class DefaultThreadPoolFactory extends ThreadPoolFactory {
8484

8585
def createBatchExecutor(name : String, reentrant : Boolean) : BatchExecutor = {
8686
if (reentrant) {
87-
val queue = new ElasticBlockingQueue[Runnable]
88-
val pool = new BatchPoolExecutor("scalang", name, 1, max_threads, 60l, TimeUnit.SECONDS, queue, new NamedThreadFactory(name))
89-
queue.executor = pool
87+
val queue = new LinkedBlockingQueue[Runnable]
88+
val pool = new BatchPoolExecutor("scalang", name, max_threads, max_threads, 60l, TimeUnit.SECONDS, queue, new NamedThreadFactory(name))
9089
pool
9190
} else {
9291
batchExecutor

0 commit comments

Comments
 (0)