Swapped out Scala Actors to a reactor based impl (still restart and linking to do) + finalized transactional persisted cassandra based vector (ref to go)

This commit is contained in:
Jonas Boner 2009-06-10 20:04:33 +02:00
parent 167b724671
commit ac52556595
22 changed files with 726 additions and 404 deletions

View file

@ -30,25 +30,24 @@ class ThreadBasedDispatcherTest {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
}
@Test
//@Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val handleLatch = new CountDownLatch(100)
val key = "key"
ThreadBasedDispatcher.registerHandler(key, new MessageHandler {
val dispatcher = new ThreadBasedDispatcher
dispatcher.registerHandler(key, new MessageHandler {
def handle(message: MessageHandle) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} else threadingIssueDetected.set(true)
} catch {
case e: Exception => threadingIssueDetected.set(true)
} finally {
@ -56,42 +55,46 @@ class ThreadBasedDispatcherTest {
}
}
})
ThreadBasedDispatcher.start
dispatcher.start
for (i <- 0 until 100) {
ThreadBasedDispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult))
dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertTrue(handleLatch.await(5000, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
//dispatcher.shutdown
}
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
ThreadBasedDispatcher.registerHandler(key1, new MessageHandler {
val dispatcher = new ThreadBasedDispatcher
dispatcher.registerHandler(key1, new MessageHandler {
def handle(message: MessageHandle) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
ThreadBasedDispatcher.registerHandler(key2, new MessageHandler {
dispatcher.registerHandler(key2, new MessageHandler {
def handle(message: MessageHandle) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
ThreadBasedDispatcher.start
ThreadBasedDispatcher.messageQueue.append(new MessageHandle(key1, new Object, new NullFutureResult))
ThreadBasedDispatcher.messageQueue.append(new MessageHandle(key2, new Object, new NullFutureResult))
handlersBarrier.await(1, TimeUnit.SECONDS)
dispatcher.start
dispatcher.messageQueue.append(new MessageHandle(key1, "Sending Message 1", new NullFutureResult))
dispatcher.messageQueue.append(new MessageHandle(key2, "Sending Message 2", new NullFutureResult))
handlersBarrier.await(5, TimeUnit.SECONDS)
assertFalse(threadingIssueDetected.get)
//dispatcher.shutdown
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
ThreadBasedDispatcher.registerHandler(key1, new MessageHandler {
val dispatcher = new ThreadBasedDispatcher
dispatcher.registerHandler(key1, new MessageHandler {
var currentValue = -1;
def handle(message: MessageHandle) {
if (threadingIssueDetected.get) return
@ -102,23 +105,24 @@ class ThreadBasedDispatcherTest {
} else threadingIssueDetected.set(true)
}
})
ThreadBasedDispatcher.registerHandler(key2, new MessageHandler {
dispatcher.registerHandler(key2, new MessageHandler {
var currentValue = -1;
def handle(message: MessageHandle) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
currentValue = messageValue .intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
ThreadBasedDispatcher.start
dispatcher.start
for (i <- 0 until 100) {
ThreadBasedDispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult))
ThreadBasedDispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult))
dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult))
dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertTrue(handleLatch.await(10, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
dispatcher.shutdown
}
}