diff --git a/akka.iws b/akka.iws index 149ba860bc..4c7eedd34c 100644 --- a/akka.iws +++ b/akka.iws @@ -2,14 +2,14 @@ - - - + + - - - + + + + @@ -142,7 +142,7 @@ - + @@ -202,28 +202,28 @@ - - + + - + - - + + - + - - + + - + @@ -1337,7 +1337,7 @@ - + @@ -1439,7 +1439,7 @@ - + @@ -1486,23 +1486,23 @@ - + - + - + - + - + - + diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index 872f1be997..7f83da11bd 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -96,7 +96,7 @@ sealed class TransactionalAroundAdvice(target: Class[_], server.transactionalVectors = vectors import kernel.reactor._ - private[this] var dispatcher = new ProxyMessageDispatcher + private[this] var dispatcher = new ProxyDispatcher private[this] var mailbox = dispatcher.messageQueue dispatcher.start diff --git a/kernel/src/main/scala/Actor.scala b/kernel/src/main/scala/Actor.scala index 941dfff9ab..cd069224eb 100644 --- a/kernel/src/main/scala/Actor.scala +++ b/kernel/src/main/scala/Actor.scala @@ -120,9 +120,9 @@ trait Actor { if (!isRunning) { dispatcherType match { case EventBased => - dispatcher = new EventBasedDispatcher + dispatcher = new EventBasedSingleThreadDispatcher case ThreadBased => - dispatcher = new ThreadBasedDispatcher + dispatcher = new EventBasedThreadPoolDispatcher } mailbox = dispatcher.messageQueue dispatcher.registerHandler(this, new ActorMessageHandler(this)) diff --git a/kernel/src/main/scala/reactor/EventBasedDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala similarity index 79% rename from kernel/src/main/scala/reactor/EventBasedDispatcher.scala rename to kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala index ba6aa5d643..af987a9d72 100644 --- a/kernel/src/main/scala/reactor/EventBasedDispatcher.scala +++ b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala @@ -10,10 +10,10 @@ */ package se.scalablesolutions.akka.kernel.reactor -class EventBasedDispatcher extends MessageDispatcherBase { +class EventBasedSingleThreadDispatcher extends MessageDispatcherBase { def start = if (!active) { active = true - val messageDemultiplexer = new EventBasedDemultiplexer(messageQueue) + val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(messageQueue) selectorThread = new Thread { override def run = { while (active) { @@ -34,7 +34,7 @@ class EventBasedDispatcher extends MessageDispatcherBase { } } -class EventBasedDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { +class EventBasedSingleThreadDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { import java.util.{LinkedList, Queue} private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle] @@ -43,7 +43,7 @@ class EventBasedDemultiplexer(private val messageQueue: MessageQueue) extends Me def acquireSelectedQueue: Queue[MessageHandle] = selectedQueue - def releaseSelectedQueue = throw new UnsupportedOperationException("EventBasedDemultiplexer can't release its queue") + def releaseSelectedQueue = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't release its queue") - def wakeUp = throw new UnsupportedOperationException("EventBasedDemultiplexer can't be woken up") + def wakeUp = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't be woken up") } diff --git a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala similarity index 92% rename from kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala rename to kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala index c11509a2eb..ec4f7711af 100644 --- a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala +++ b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala @@ -10,7 +10,7 @@ */ package se.scalablesolutions.akka.kernel.reactor -class ThreadBasedDispatcher extends MessageDispatcherBase { +class EventBasedThreadPoolDispatcher extends MessageDispatcherBase { import java.util.concurrent.Executors import java.util.HashSet @@ -22,7 +22,7 @@ class ThreadBasedDispatcher extends MessageDispatcherBase { def start = if (!active) { active = true - val messageDemultiplexer = new ThreadBasedDemultiplexer(messageQueue) + val messageDemultiplexer = new EventBasedThreadPoolDemultiplexer(messageQueue) selectorThread = new Thread { //val enqued = new LinkedList[MessageHandle] override def run = { @@ -74,7 +74,7 @@ class ThreadBasedDispatcher extends MessageDispatcherBase { private def free(key: AnyRef) = synchronized { busyHandlers.remove(key) } } -class ThreadBasedDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { +class EventBasedThreadPoolDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { import java.util.concurrent.locks.ReentrantLock import java.util.{LinkedList, Queue} diff --git a/kernel/src/main/scala/reactor/ProxyMessageDispatcher.scala b/kernel/src/main/scala/reactor/ProxyDispatcher.scala similarity index 91% rename from kernel/src/main/scala/reactor/ProxyMessageDispatcher.scala rename to kernel/src/main/scala/reactor/ProxyDispatcher.scala index aec34ce6f1..16a2cd8292 100644 --- a/kernel/src/main/scala/reactor/ProxyMessageDispatcher.scala +++ b/kernel/src/main/scala/reactor/ProxyDispatcher.scala @@ -10,7 +10,7 @@ */ package se.scalablesolutions.akka.kernel.reactor -class ProxyMessageDispatcher extends MessageDispatcherBase { +class ProxyDispatcher extends MessageDispatcherBase { import java.util.concurrent.Executors import java.util.HashSet import org.codehaus.aspectwerkz.joinpoint.JoinPoint @@ -22,7 +22,7 @@ class ProxyMessageDispatcher extends MessageDispatcherBase { def start = if (!active) { active = true - val messageDemultiplexer = new ProxyMessageDemultiplexer(messageQueue) + val messageDemultiplexer = new ProxyDemultiplexer(messageQueue) selectorThread = new Thread { override def run = { while (active) { @@ -58,7 +58,7 @@ class ProxyMessageDispatcher extends MessageDispatcherBase { override protected def doShutdown = handlerExecutor.shutdownNow } -class ProxyMessageDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { +class ProxyDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { import java.util.concurrent.locks.ReentrantLock import java.util.{LinkedList, Queue} diff --git a/kernel/src/test/scala/EventBasedDispatcherTest.scala b/kernel/src/test/scala/EventBasedDispatcherTest.scala index b6a8cbf18f..adf01c5733 100644 --- a/kernel/src/test/scala/EventBasedDispatcherTest.scala +++ b/kernel/src/test/scala/EventBasedDispatcherTest.scala @@ -57,7 +57,7 @@ class EventBasedDispatcherTest { val guardLock = new ReentrantLock val handleLatch = new CountDownLatch(10) val key = "key" - val dispatcher = new EventBasedDispatcher + val dispatcher = new EventBasedSingleThreadDispatcher dispatcher.registerHandler(key, new TestMessageHandle(handleLatch)) dispatcher.start for (i <- 0 until 10) { @@ -71,7 +71,7 @@ class EventBasedDispatcherTest { val handleLatch = new CountDownLatch(2) val key1 = "key1" val key2 = "key2" - val dispatcher = new EventBasedDispatcher + val dispatcher = new EventBasedSingleThreadDispatcher dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch)) dispatcher.start @@ -85,7 +85,7 @@ class EventBasedDispatcherTest { val handleLatch = new CountDownLatch(200) val key1 = "key1" val key2 = "key2" - val dispatcher = new EventBasedDispatcher + val dispatcher = new EventBasedSingleThreadDispatcher dispatcher.registerHandler(key1, new MessageHandler { var currentValue = -1; def handle(message: MessageHandle) { diff --git a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala index 4bb33c1478..139bb8cbbc 100644 --- a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala +++ b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala @@ -39,7 +39,7 @@ class ThreadBasedDispatcherTest { val guardLock = new ReentrantLock val handleLatch = new CountDownLatch(100) val key = "key" - val dispatcher = new ThreadBasedDispatcher + val dispatcher = new EventBasedThreadPoolDispatcher dispatcher.registerHandler(key, new MessageHandler { def handle(message: MessageHandle) { try { @@ -68,7 +68,7 @@ class ThreadBasedDispatcherTest { val handlersBarrier = new CyclicBarrier(3) val key1 = "key1" val key2 = "key2" - val dispatcher = new ThreadBasedDispatcher + val dispatcher = new EventBasedThreadPoolDispatcher dispatcher.registerHandler(key1, new MessageHandler { def handle(message: MessageHandle) = synchronized { try {handlersBarrier.await(1, TimeUnit.SECONDS)} @@ -93,7 +93,7 @@ class ThreadBasedDispatcherTest { val handleLatch = new CountDownLatch(200) val key1 = "key1" val key2 = "key2" - val dispatcher = new ThreadBasedDispatcher + val dispatcher = new EventBasedThreadPoolDispatcher dispatcher.registerHandler(key1, new MessageHandler { var currentValue = -1; def handle(message: MessageHandle) {