Merge branch 'master' of git://github.com/sergiob/akka

This commit is contained in:
jboner 2009-07-27 21:51:22 +02:00
commit bf50299629

View file

@ -11,133 +11,151 @@ import org.junit.Assert._
import junit.framework.TestCase
class EventBasedThreadPoolDispatcherTest extends TestCase {
private var threadingIssueDetected: AtomicBoolean = null
private var threadingIssueDetected: AtomicBoolean = null
@Before
override def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Before
override def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Test
def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test
def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test
def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
}
@Test
def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
}
@Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
@Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key, new MessageInvoker {
def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
} finally {
guardLock.unlock
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key, new MessageInvoker {
def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
} finally {
guardLock.unlock
}
}
})
dispatcher.start
for (i <- 0 until 10) {
dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None))
}
}
})
dispatcher.start
for (i <- 0 until 10) {
dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None))
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.start
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None))
handlersBarrier.await(5, TimeUnit.SECONDS)
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
val guardLock1 = new ReentrantLock
val guardLock2 = new ReentrantLock
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {
if (guardLock1.tryLock) {
handlersBarrier.await(1, TimeUnit.SECONDS)
} else {
threadingIssueDetected.set(true);
}
}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {
if (guardLock2.tryLock) {
handlersBarrier.await(1, TimeUnit.SECONDS)
} else {
threadingIssueDetected.set(true);
}}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.start
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1.1", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2.2", None, None))
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.start
for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
handlersBarrier.await(5, TimeUnit.SECONDS)
handlersBarrier.await(5, TimeUnit.SECONDS)
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.start
for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
}