diff --git a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala index ea3b83729e..a5e936f4ec 100644 --- a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala +++ b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala @@ -11,133 +11,151 @@ import org.junit.Assert._ import junit.framework.TestCase class EventBasedThreadPoolDispatcherTest extends TestCase { - private var threadingIssueDetected: AtomicBoolean = null + private var threadingIssueDetected: AtomicBoolean = null - @Before - override def setUp = { - threadingIssueDetected = new AtomicBoolean(false) - } + @Before + override def setUp = { + threadingIssueDetected = new AtomicBoolean(false) + } - @Test - def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = { - internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially - } + @Test + def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = { + internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially + } - @Test - def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = { - internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently - } + @Test + def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = { + internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently + } - @Test - def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = { - internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder - } + @Test + def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = { + internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder + } - private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = { - val guardLock = new ReentrantLock - val handleLatch = new CountDownLatch(10) - val key = "key" - val dispatcher = new EventBasedThreadPoolDispatcher - dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) - .setCorePoolSize(2) - .setMaxPoolSize(4) - .setKeepAliveTimeInMillis(60000) - .setRejectionPolicy(new CallerRunsPolicy) - .buildThreadPool - dispatcher.registerHandler(key, new MessageInvoker { - def invoke(message: MessageInvocation) { - try { - if (threadingIssueDetected.get) return - if (guardLock.tryLock) { - Thread.sleep(100) - handleLatch.countDown - } else { - threadingIssueDetected.set(true) - } - } catch { - case e: Exception => threadingIssueDetected.set(true); e.printStackTrace - } finally { - guardLock.unlock + private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = { + val guardLock = new ReentrantLock + val handleLatch = new CountDownLatch(10) + val key = "key" + val dispatcher = new EventBasedThreadPoolDispatcher + dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) + .setCorePoolSize(2) + .setMaxPoolSize(4) + .setKeepAliveTimeInMillis(60000) + .setRejectionPolicy(new CallerRunsPolicy) + .buildThreadPool + dispatcher.registerHandler(key, new MessageInvoker { + def invoke(message: MessageInvocation) { + try { + if (threadingIssueDetected.get) return + if (guardLock.tryLock) { + Thread.sleep(100) + handleLatch.countDown + } else { + threadingIssueDetected.set(true) + } + } catch { + case e: Exception => threadingIssueDetected.set(true); e.printStackTrace + } finally { + guardLock.unlock + } + } + }) + dispatcher.start + for (i <- 0 until 10) { + dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None)) } - } - }) - dispatcher.start - for (i <- 0 until 10) { - dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None)) + assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) + assertFalse(threadingIssueDetected.get) } - assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) - assertFalse(threadingIssueDetected.get) - } - private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = { - val handlersBarrier = new CyclicBarrier(3) - val key1 = "key1" - val key2 = "key2" - val dispatcher = new EventBasedThreadPoolDispatcher - dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) - .setCorePoolSize(2) - .setMaxPoolSize(4) - .setKeepAliveTimeInMillis(60000) - .setRejectionPolicy(new CallerRunsPolicy) - .buildThreadPool - dispatcher.registerHandler(key1, new MessageInvoker { - def invoke(message: MessageInvocation) = synchronized { - try {handlersBarrier.await(1, TimeUnit.SECONDS)} - catch {case e: Exception => threadingIssueDetected.set(true)} - } - }) - dispatcher.registerHandler(key2, new MessageInvoker { - def invoke(message: MessageInvocation) = synchronized { - try {handlersBarrier.await(1, TimeUnit.SECONDS)} - catch {case e: Exception => threadingIssueDetected.set(true)} - } - }) - dispatcher.start - dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None)) - dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None)) - handlersBarrier.await(5, TimeUnit.SECONDS) - assertFalse(threadingIssueDetected.get) - } + private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = { + val guardLock1 = new ReentrantLock + val guardLock2 = new ReentrantLock + val handlersBarrier = new CyclicBarrier(3) + val key1 = "key1" + val key2 = "key2" + val dispatcher = new EventBasedThreadPoolDispatcher + dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) + .setCorePoolSize(2) + .setMaxPoolSize(4) + .setKeepAliveTimeInMillis(60000) + .setRejectionPolicy(new CallerRunsPolicy) + .buildThreadPool + dispatcher.registerHandler(key1, new MessageInvoker { + def invoke(message: MessageInvocation) = synchronized { + try { + if (guardLock1.tryLock) { + handlersBarrier.await(1, TimeUnit.SECONDS) + } else { + threadingIssueDetected.set(true); + } + } + catch {case e: Exception => threadingIssueDetected.set(true)} + } + }) + dispatcher.registerHandler(key2, new MessageInvoker { + def invoke(message: MessageInvocation) = synchronized { + try { + if (guardLock2.tryLock) { + handlersBarrier.await(1, TimeUnit.SECONDS) + } else { + threadingIssueDetected.set(true); + }} + catch {case e: Exception => threadingIssueDetected.set(true)} + } + }) + dispatcher.start + dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1.1", None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2.2", None, None)) - private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = { - val handleLatch = new CountDownLatch(200) - val key1 = "key1" - val key2 = "key2" - val dispatcher = new EventBasedThreadPoolDispatcher - dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) - .setCorePoolSize(2) - .setMaxPoolSize(4) - .setKeepAliveTimeInMillis(60000) - .setRejectionPolicy(new CallerRunsPolicy) - .buildThreadPool - dispatcher.registerHandler(key1, new MessageInvoker { - var currentValue = -1; - def invoke(message: MessageInvocation) { - if (threadingIssueDetected.get) return - val messageValue = message.message.asInstanceOf[Int] - if (messageValue.intValue == currentValue + 1) { - currentValue = messageValue.intValue - handleLatch.countDown - } else threadingIssueDetected.set(true) - } - }) - dispatcher.registerHandler(key2, new MessageInvoker { - var currentValue = -1; - def invoke(message: MessageInvocation) { - if (threadingIssueDetected.get) return - val messageValue = message.message.asInstanceOf[Int] - if (messageValue.intValue == currentValue + 1) { - currentValue = messageValue.intValue - handleLatch.countDown - } else threadingIssueDetected.set(true) - } - }) - dispatcher.start - for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None)) - dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None)) + handlersBarrier.await(5, TimeUnit.SECONDS) + handlersBarrier.await(5, TimeUnit.SECONDS) + + assertFalse(threadingIssueDetected.get) + } + + private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = { + val handleLatch = new CountDownLatch(200) + val key1 = "key1" + val key2 = "key2" + val dispatcher = new EventBasedThreadPoolDispatcher + dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) + .setCorePoolSize(2) + .setMaxPoolSize(4) + .setKeepAliveTimeInMillis(60000) + .setRejectionPolicy(new CallerRunsPolicy) + .buildThreadPool + dispatcher.registerHandler(key1, new MessageInvoker { + var currentValue = -1; + def invoke(message: MessageInvocation) { + if (threadingIssueDetected.get) return + val messageValue = message.message.asInstanceOf[Int] + if (messageValue.intValue == currentValue + 1) { + currentValue = messageValue.intValue + handleLatch.countDown + } else threadingIssueDetected.set(true) + } + }) + dispatcher.registerHandler(key2, new MessageInvoker { + var currentValue = -1; + def invoke(message: MessageInvocation) { + if (threadingIssueDetected.get) return + val messageValue = message.message.asInstanceOf[Int] + if (messageValue.intValue == currentValue + 1) { + currentValue = messageValue.intValue + handleLatch.countDown + } else threadingIssueDetected.set(true) + } + }) + dispatcher.start + for (i <- 0 until 100) { + dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None)) + } + assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) + assertFalse(threadingIssueDetected.get) } - assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) - assertFalse(threadingIssueDetected.get) - } }