concurrent mode is now per-dispatcher

This commit is contained in:
jboner 2009-07-28 17:48:11 +02:00
parent 0ff677fe0b
commit 8b3a31ebde
7 changed files with 155 additions and 115 deletions

View file

@ -97,7 +97,7 @@ object Actor {
* </pre>
*/
protected[kernel] var dispatcher: MessageDispatcher = {
val dispatcher = new EventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
mailbox = dispatcher.messageQueue
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
dispatcher

View file

@ -50,6 +50,7 @@ class DispatcherFactory {
* Has a fluent builder interface for configuring its semantics.
*/
def newEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher
def newConcurrentEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher(true)
/**
* Creates an event based dispatcher serving multiple (millions) of actors through a single thread.

View file

@ -56,11 +56,12 @@ import java.util.{Collection, HashSet, HashMap, LinkedList, List}
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extends MessageDispatcherBase {
def this() = this(false)
private val NR_START_THREADS = 16
private val NR_MAX_THREADS = 128
private val KEEP_ALIVE_TIME = 60000L // default is one minute
private var inProcessOfBuilding = false
private var executor: ExecutorService = _
private var threadPoolBuilder: ThreadPoolExecutor = _
@ -117,7 +118,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
val iterator = invocations.iterator
while (iterator.hasNext) {
val invocation = iterator.next
if (CONCURRENT_MODE) {
if (concurrentMode) {
val invoker = messageHandlers.get(invocation.sender)
if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]")
if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null")
@ -135,7 +136,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
}
private def free(invoker: AnyRef) = guard.synchronized {
if (!CONCURRENT_MODE) busyInvokers.remove(invoker)
if (!concurrentMode) busyInvokers.remove(invoker)
}
// ============ Code for configuration of thread pool =============

View file

@ -9,9 +9,8 @@ import java.util.concurrent.TimeUnit
import java.util.HashMap
trait MessageDispatcherBase extends MessageDispatcher {
val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
//val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
val MILLISECONDS = TimeUnit.MILLISECONDS
val queue = new ReactiveMessageQueue
@volatile protected var active: Boolean = false

View file

@ -41,8 +41,6 @@ class MessageInvocation(val sender: AnyRef,
var result = HashCode.SEED
result = HashCode.hash(result, sender)
result = HashCode.hash(result, message)
result = if (future.isDefined) HashCode.hash(result, future.get) else result
result = if (tx.isDefined) HashCode.hash(result, tx.get.id) else result
result
}
@ -50,11 +48,7 @@ class MessageInvocation(val sender: AnyRef,
that != null &&
that.isInstanceOf[MessageInvocation] &&
that.asInstanceOf[MessageInvocation].sender == sender &&
that.asInstanceOf[MessageInvocation].message == message &&
that.asInstanceOf[MessageInvocation].future.isDefined == future.isDefined &&
that.asInstanceOf[MessageInvocation].future.get == future.get &&
that.asInstanceOf[MessageInvocation].tx.isDefined == tx.isDefined &&
that.asInstanceOf[MessageInvocation].tx.get.id == tx.get.id
that.asInstanceOf[MessageInvocation].message == message
override def toString(): String = "MessageInvocation[message = " + message + ", sender = " + sender + ", future = " + future + ", tx = " + tx + "]"
}

View file

@ -37,7 +37,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
val dispatcher = new EventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
@ -76,7 +76,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
@ -121,7 +121,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)