diff --git a/akka.ipr b/akka.ipr
index 5d48026a49..14bd9a9fdd 100644
--- a/akka.ipr
+++ b/akka.ipr
@@ -1246,17 +1246,6 @@
-
-
-
-
-
-
-
-
-
-
-
@@ -1268,17 +1257,6 @@
-
-
-
-
-
-
-
-
-
-
-
@@ -1456,6 +1434,39 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/akka.iws b/akka.iws
index 1acef65cde..951d4fab94 100644
--- a/akka.iws
+++ b/akka.iws
@@ -6,16 +6,21 @@
-
-
+
+
+
+
+
+
+
+
+
-
-
-
+
-
+
@@ -32,7 +37,33 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -108,55 +139,73 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -165,7 +214,7 @@
-
+
@@ -193,11 +242,6 @@
@@ -490,54 +539,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -630,7 +631,7 @@
-
+
@@ -660,7 +661,7 @@
-
+
@@ -777,7 +778,7 @@
-
+
@@ -1944,17 +1945,18 @@
-
+
-
+
-
-
+
+
+
@@ -1966,9 +1968,8 @@
-
-
+
@@ -2010,62 +2011,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -2089,7 +2034,14 @@
-
+
+
+
+
+
+
+
+
@@ -2101,6 +2053,13 @@
+
+
+
+
+
+
+
@@ -2108,16 +2067,58 @@
-
+
-
+
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/fun-test-java/akka-fun-test-java.iml b/fun-test-java/akka-fun-test-java.iml
index c4444b4718..46e1b7eea9 100644
--- a/fun-test-java/akka-fun-test-java.iml
+++ b/fun-test-java/akka-fun-test-java.iml
@@ -46,9 +46,9 @@
-
+
-
+
@@ -67,6 +67,7 @@
+
diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml
index 1da285114b..caa62dd171 100644
--- a/kernel/akka-kernel.iml
+++ b/kernel/akka-kernel.iml
@@ -62,9 +62,9 @@
-
+
-
+
@@ -83,6 +83,7 @@
+
diff --git a/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
index 1884b10e24..c3d52e777e 100644
--- a/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
+++ b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
@@ -10,6 +10,8 @@
*/
package se.scalablesolutions.akka.kernel.reactor
+import java.util.{LinkedList, Queue, List}
+
class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
def start = if (!active) {
active = true
@@ -20,11 +22,12 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
try {
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
- val selectedQueue = messageDemultiplexer.acquireSelectedQueue
- for (index <- 0 until selectedQueue.size) {
- val handle = selectedQueue.remove
- val handler = messageHandlers.get(handle.sender)
- if (handler != null) handler.invoke(handle)
+ val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations.iterator
+ while (selectedInvocations.hasNext) {
+ val invocation = selectedInvocations.next
+ val invoker = messageHandlers.get(invocation.sender)
+ if (invoker != null) invoker.invoke(invocation)
+ selectedInvocations.remove
}
}
}
@@ -34,15 +37,14 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
}
class EventBasedSingleThreadDemultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
- import java.util.{LinkedList, Queue}
- private val selectedQueue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
+ private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation]
def select = messageQueue.read(selectedQueue)
- def acquireSelectedQueue: Queue[MessageInvocation] = selectedQueue
+ def acquireSelectedInvocations: List[MessageInvocation] = selectedQueue
- def releaseSelectedQueue = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't release its queue")
+ def releaseSelectedInvocations = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't release its queue")
def wakeUp = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't be woken up")
}
diff --git a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
index 4950cda4bf..29920e2356 100644
--- a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
+++ b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
@@ -8,7 +8,7 @@ import java.util.concurrent._
import locks.ReentrantLock
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
-import java.util.{Collection, HashSet, LinkedList, Queue}
+import java.util.{Collection, HashSet, HashMap, LinkedList, List}
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
@@ -66,7 +66,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
private var threadPoolBuilder: ThreadPoolExecutor = _
private val threadFactory = new MonitorableThreadFactory("akka")
private var boundedExecutorBound = -1
- private val busyHandlers = new HashSet[AnyRef]
+ private val busyInvokers = new HashSet[AnyRef]
// build default thread pool
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
@@ -82,27 +82,27 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
override def run = {
while (active) {
try {
- try {
- guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
+ //try {
+ // guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
messageDemultiplexer.select
- } catch {case e: InterruptedException => active = false}
- val selectedQueue = messageDemultiplexer.acquireSelectedQueue
- for (index <- 0 until selectedQueue.size) {
- val message = selectedQueue.peek
- val messageHandler = getIfNotBusy(message.sender)
- if (messageHandler.isDefined) {
- executor.execute(new Runnable {
- override def run = {
- messageHandler.get.invoke(message)
- free(message.sender)
- messageDemultiplexer.wakeUp
- }
- })
- selectedQueue.remove
- }
+ //} catch { case e: InterruptedException => active = false }
+ val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
+ val reservedInvocations = reserve(selectedInvocations)
+ val it = reservedInvocations.entrySet.iterator
+ while (it.hasNext) {
+ val entry = it.next
+ val invocation = entry.getKey
+ val invoker = entry.getValue
+ threadPoolBuilder.execute(new Runnable() {
+ def run = {
+ invoker.invoke(invocation)
+ free(invocation.sender)
+ messageDemultiplexer.wakeUp
+ }
+ })
}
} finally {
- messageDemultiplexer.releaseSelectedQueue
+ messageDemultiplexer.releaseSelectedInvocations
}
}
}
@@ -112,19 +112,25 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
override protected def doShutdown = executor.shutdownNow
- private def getIfNotBusy(key: AnyRef): Option[MessageInvoker] = guard.synchronized {
- if (CONCURRENT_MODE && messageHandlers.containsKey(key)) Some(messageHandlers.get(key))
- else if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) {
- busyHandlers.add(key)
- Some(messageHandlers.get(key))
- } else None
+ private def reserve(invocations: List[MessageInvocation]): HashMap[MessageInvocation, MessageInvoker] = synchronized {
+ // if (CONCURRENT_MODE && messageHandlers.containsKey(key)) Some(messageHandlers.get(key))
+ val result = new HashMap[MessageInvocation, MessageInvoker]
+ val iterator = invocations.iterator
+ while (iterator.hasNext) {
+ val invocation = iterator.next
+ if (!busyInvokers.contains(invocation.sender)) {
+ result.put(invocation, messageHandlers.get(invocation.sender))
+ busyInvokers.add(invocation.sender)
+ iterator.remove
+ }
+ }
+ result
}
- private def free(key: AnyRef) = guard.synchronized {
- if (!CONCURRENT_MODE) busyHandlers.remove(key)
+ private def free(invoker: AnyRef) = synchronized {
+ if (!CONCURRENT_MODE) busyInvokers.remove(invoker)
}
-
-
+
// ============ Code for configuration of thread pool =============
def buildThreadPool = synchronized {
@@ -241,25 +247,22 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
}
class EventBasedThreadPoolDemultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
- private val selectedQueue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
- private val selectedQueueLock = new ReentrantLock
+ private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation]
+ private val selectedInvocationsLock = new ReentrantLock
def select = try {
- selectedQueueLock.lock
- messageQueue.read(selectedQueue)
+ selectedInvocationsLock.lock
+ messageQueue.read(selectedInvocations)
} finally {
- selectedQueueLock.unlock
+ selectedInvocationsLock.unlock
}
- def acquireSelectedQueue: Queue[MessageInvocation] = {
- selectedQueueLock.lock
- selectedQueue
+ def acquireSelectedInvocations: List[MessageInvocation] = {
+ selectedInvocationsLock.lock
+ selectedInvocations
}
- def releaseSelectedQueue = {
- //selectedQueue.clear
- selectedQueueLock.unlock
- }
+ def releaseSelectedInvocations = selectedInvocationsLock.unlock
def wakeUp = messageQueue.interrupt
}
diff --git a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
index 8fb35d84b1..7368c7f8e8 100644
--- a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
+++ b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
@@ -4,7 +4,7 @@
package se.scalablesolutions.akka.kernel.reactor
-import java.util.{LinkedList, Queue}
+import java.util.{LinkedList, Queue, List}
import java.util.concurrent.TimeUnit
import java.util.HashMap
@@ -21,11 +21,11 @@ trait MessageDispatcherBase extends MessageDispatcher {
def messageQueue = queue
- def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized {
+ def registerHandler(key: AnyRef, handler: MessageInvoker) = synchronized {
messageHandlers.put(key, handler)
}
- def unregisterHandler(key: AnyRef) = guard.synchronized {
+ def unregisterHandler(key: AnyRef) = synchronized {
messageHandlers.remove(key)
}
@@ -55,9 +55,9 @@ class ReactiveMessageQueue extends MessageQueue {
queue.notifyAll
}
- def read(destination: Queue[MessageInvocation]) = queue.synchronized {
+ def read(destination: List[MessageInvocation]) = queue.synchronized {
while (queue.isEmpty && !interrupted) queue.wait
- if (!interrupted) while (!queue.isEmpty) destination.offer(queue.remove)
+ if (!interrupted) while (!queue.isEmpty) destination.add(queue.remove)
else interrupted = false
}
diff --git a/kernel/src/main/scala/reactor/Reactor.scala b/kernel/src/main/scala/reactor/Reactor.scala
index cfc88b56ba..9a87d4d73f 100644
--- a/kernel/src/main/scala/reactor/Reactor.scala
+++ b/kernel/src/main/scala/reactor/Reactor.scala
@@ -4,7 +4,7 @@
package se.scalablesolutions.akka.kernel.reactor
-import java.util.Queue
+import java.util.List
import kernel.stm.Transaction
import kernel.util.HashCode
@@ -27,8 +27,8 @@ trait MessageDispatcher {
trait MessageDemultiplexer {
def select
- def acquireSelectedQueue: Queue[MessageInvocation]
- def releaseSelectedQueue
+ def acquireSelectedInvocations: List[MessageInvocation]
+ def releaseSelectedInvocations
def wakeUp
}
diff --git a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
index a5e936f4ec..d5a89b5ad9 100644
--- a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
+++ b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
@@ -11,151 +11,151 @@ import org.junit.Assert._
import junit.framework.TestCase
class EventBasedThreadPoolDispatcherTest extends TestCase {
- private var threadingIssueDetected: AtomicBoolean = null
+ private var threadingIssueDetected: AtomicBoolean = null
- @Before
- override def setUp = {
- threadingIssueDetected = new AtomicBoolean(false)
- }
+ @Before
+ override def setUp = {
+ threadingIssueDetected = new AtomicBoolean(false)
+ }
- @Test
- def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
- internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
- }
+ @Test
+ def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
+ internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
+ }
- @Test
- def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
- internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
- }
+ @Test
+ def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
+ internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
+ }
- @Test
- def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
- internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
- }
+ @Test
+ def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
+ internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
+ }
- private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
- val guardLock = new ReentrantLock
- val handleLatch = new CountDownLatch(10)
- val key = "key"
- val dispatcher = new EventBasedThreadPoolDispatcher
- dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
- .setCorePoolSize(2)
- .setMaxPoolSize(4)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy)
- .buildThreadPool
- dispatcher.registerHandler(key, new MessageInvoker {
- def invoke(message: MessageInvocation) {
- try {
- if (threadingIssueDetected.get) return
- if (guardLock.tryLock) {
- Thread.sleep(100)
- handleLatch.countDown
- } else {
- threadingIssueDetected.set(true)
- }
- } catch {
- case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
- } finally {
- guardLock.unlock
- }
- }
- })
- dispatcher.start
- for (i <- 0 until 10) {
- dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None))
+ private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
+ val guardLock = new ReentrantLock
+ val handleLatch = new CountDownLatch(10)
+ val key = "key"
+ val dispatcher = new EventBasedThreadPoolDispatcher
+ dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
+ .setCorePoolSize(2)
+ .setMaxPoolSize(4)
+ .setKeepAliveTimeInMillis(60000)
+ .setRejectionPolicy(new CallerRunsPolicy)
+ .buildThreadPool
+ dispatcher.registerHandler(key, new MessageInvoker {
+ def invoke(message: MessageInvocation) {
+ try {
+ if (threadingIssueDetected.get) return
+ if (guardLock.tryLock) {
+ Thread.sleep(100)
+ handleLatch.countDown
+ } else {
+ threadingIssueDetected.set(true)
+ return
+ }
+ } catch {
+ case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
+ } finally {
+ guardLock.unlock
}
- assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
- assertFalse(threadingIssueDetected.get)
+ }
+ })
+ dispatcher.start
+ for (i <- 0 until 10) {
+ dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None))
}
+ assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
+ assertFalse(threadingIssueDetected.get)
+ }
- private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
- val guardLock1 = new ReentrantLock
- val guardLock2 = new ReentrantLock
- val handlersBarrier = new CyclicBarrier(3)
- val key1 = "key1"
- val key2 = "key2"
- val dispatcher = new EventBasedThreadPoolDispatcher
- dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
- .setCorePoolSize(2)
- .setMaxPoolSize(4)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy)
- .buildThreadPool
- dispatcher.registerHandler(key1, new MessageInvoker {
- def invoke(message: MessageInvocation) = synchronized {
- try {
- if (guardLock1.tryLock) {
- handlersBarrier.await(1, TimeUnit.SECONDS)
- } else {
- threadingIssueDetected.set(true);
- }
- }
- catch {case e: Exception => threadingIssueDetected.set(true)}
- }
- })
- dispatcher.registerHandler(key2, new MessageInvoker {
- def invoke(message: MessageInvocation) = synchronized {
- try {
- if (guardLock2.tryLock) {
- handlersBarrier.await(1, TimeUnit.SECONDS)
- } else {
- threadingIssueDetected.set(true);
- }}
- catch {case e: Exception => threadingIssueDetected.set(true)}
- }
- })
- dispatcher.start
- dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None))
- dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1.1", None, None))
- dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None))
- dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2.2", None, None))
-
- handlersBarrier.await(5, TimeUnit.SECONDS)
- handlersBarrier.await(5, TimeUnit.SECONDS)
-
- assertFalse(threadingIssueDetected.get)
- }
-
- private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
- val handleLatch = new CountDownLatch(200)
- val key1 = "key1"
- val key2 = "key2"
- val dispatcher = new EventBasedThreadPoolDispatcher
- dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
- .setCorePoolSize(2)
- .setMaxPoolSize(4)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy)
- .buildThreadPool
- dispatcher.registerHandler(key1, new MessageInvoker {
- var currentValue = -1;
- def invoke(message: MessageInvocation) {
- if (threadingIssueDetected.get) return
- val messageValue = message.message.asInstanceOf[Int]
- if (messageValue.intValue == currentValue + 1) {
- currentValue = messageValue.intValue
- handleLatch.countDown
- } else threadingIssueDetected.set(true)
- }
- })
- dispatcher.registerHandler(key2, new MessageInvoker {
- var currentValue = -1;
- def invoke(message: MessageInvocation) {
- if (threadingIssueDetected.get) return
- val messageValue = message.message.asInstanceOf[Int]
- if (messageValue.intValue == currentValue + 1) {
- currentValue = messageValue.intValue
- handleLatch.countDown
- } else threadingIssueDetected.set(true)
- }
- })
- dispatcher.start
- for (i <- 0 until 100) {
- dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
- dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
+ private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
+ val guardLock1 = new ReentrantLock
+ val guardLock2 = new ReentrantLock
+ val handlersBarrier = new CyclicBarrier(3)
+ val key1 = "key1"
+ val key2 = "key2"
+ val dispatcher = new EventBasedThreadPoolDispatcher
+ dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
+ .setCorePoolSize(2)
+ .setMaxPoolSize(4)
+ .setKeepAliveTimeInMillis(60000)
+ .setRejectionPolicy(new CallerRunsPolicy)
+ .buildThreadPool
+ dispatcher.registerHandler(key1, new MessageInvoker {
+ def invoke(message: MessageInvocation) = synchronized {
+ try {
+ if (guardLock1.tryLock) {
+ handlersBarrier.await(1, TimeUnit.SECONDS)
+ } else {
+ threadingIssueDetected.set(true);
+ }
}
- assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
- assertFalse(threadingIssueDetected.get)
+ catch {case e: Exception => threadingIssueDetected.set(true)}
+ }
+ })
+ dispatcher.registerHandler(key2, new MessageInvoker {
+ def invoke(message: MessageInvocation) = synchronized {
+ try {
+ if (guardLock2.tryLock) {
+ handlersBarrier.await(1, TimeUnit.SECONDS)
+ } else {
+ threadingIssueDetected.set(true);
+ }
+ }
+ catch {case e: Exception => threadingIssueDetected.set(true)}
+ }
+ })
+ dispatcher.start
+ dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1.1", None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2.2", None, None))
+
+ handlersBarrier.await(5, TimeUnit.SECONDS)
+ assertFalse(threadingIssueDetected.get)
+ }
+
+ private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
+ val handleLatch = new CountDownLatch(200)
+ val key1 = "key1"
+ val key2 = "key2"
+ val dispatcher = new EventBasedThreadPoolDispatcher
+ dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
+ .setCorePoolSize(2)
+ .setMaxPoolSize(4)
+ .setKeepAliveTimeInMillis(60000)
+ .setRejectionPolicy(new CallerRunsPolicy)
+ .buildThreadPool
+ dispatcher.registerHandler(key1, new MessageInvoker {
+ var currentValue = -1;
+ def invoke(message: MessageInvocation) {
+ if (threadingIssueDetected.get) return
+ val messageValue = message.message.asInstanceOf[Int]
+ if (messageValue.intValue == currentValue + 1) {
+ currentValue = messageValue.intValue
+ handleLatch.countDown
+ } else threadingIssueDetected.set(true)
+ }
+ })
+ dispatcher.registerHandler(key2, new MessageInvoker {
+ var currentValue = -1;
+ def invoke(message: MessageInvocation) {
+ if (threadingIssueDetected.get) return
+ val messageValue = message.message.asInstanceOf[Int]
+ if (messageValue.intValue == currentValue + 1) {
+ currentValue = messageValue.intValue
+ handleLatch.countDown
+ } else threadingIssueDetected.set(true)
+ }
+ })
+ dispatcher.start
+ for (i <- 0 until 100) {
+ dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
}
+ assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
+ assertFalse(threadingIssueDetected.get)
+ }
}
diff --git a/samples-java/akka-samples-java.iml b/samples-java/akka-samples-java.iml
index 9b4766e88b..0cbc7d505f 100644
--- a/samples-java/akka-samples-java.iml
+++ b/samples-java/akka-samples-java.iml
@@ -46,9 +46,9 @@
-
+
-
+
@@ -67,6 +67,7 @@
+
diff --git a/samples-scala/akka-samples-scala.iml b/samples-scala/akka-samples-scala.iml
index 0804891166..13a15c04ed 100644
--- a/samples-scala/akka-samples-scala.iml
+++ b/samples-scala/akka-samples-scala.iml
@@ -51,9 +51,9 @@
-
+
-
+
@@ -72,6 +72,7 @@
+