ExecutorBasedEventDrivenDispatcher now works and unit tests are added
This commit is contained in:
parent
28ad8210b4
commit
aab2cd6697
3 changed files with 61 additions and 6 deletions
|
|
@ -72,7 +72,7 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
|
||||
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
|
||||
|
||||
|
||||
//FIXME remove this from ThreadPoolBuilder
|
||||
mailboxCapacity = mailboxConfig.capacity
|
||||
|
||||
@volatile private var active: Boolean = false
|
||||
|
|
@ -81,18 +81,18 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
init
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = {
|
||||
getMailbox(invocation.receiver).add(invocation)
|
||||
getMailbox(invocation.receiver) enqueue invocation
|
||||
dispatch(invocation.receiver)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the mailbox associated with the actor
|
||||
*/
|
||||
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Queue[MessageInvocation]]
|
||||
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue]
|
||||
|
||||
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
|
||||
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity)
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity, blockDequeue = false)
|
||||
|
||||
def dispatch(receiver: ActorRef): Unit = if (active) {
|
||||
|
||||
|
|
@ -131,12 +131,12 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
def processMailbox(receiver: ActorRef): Boolean = {
|
||||
var processedMessages = 0
|
||||
val mailbox = getMailbox(receiver)
|
||||
var messageInvocation = mailbox.poll
|
||||
var messageInvocation = mailbox.dequeue
|
||||
while (messageInvocation != null) {
|
||||
messageInvocation.invoke
|
||||
processedMessages += 1
|
||||
// check if we simply continue with other messages, or reached the throughput limit
|
||||
if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.poll
|
||||
if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.dequeue
|
||||
else {
|
||||
messageInvocation = null
|
||||
return !mailbox.isEmpty
|
||||
|
|
|
|||
|
|
@ -65,6 +65,8 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m
|
|||
trait MessageQueue {
|
||||
def enqueue(handle: MessageInvocation)
|
||||
def dequeue(): MessageInvocation
|
||||
def size: Int
|
||||
def isEmpty: Boolean
|
||||
}
|
||||
|
||||
/* Tells the dispatcher that it should create a bounded mailbox with the specified push timeout
|
||||
|
|
|
|||
53
akka-actor/src/test/scala/dispatch/MailboxConfigSpec.scala
Normal file
53
akka-actor/src/test/scala/dispatch/MailboxConfigSpec.scala
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
package se.scalablesolutions.akka.actor.dispatch
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
import Actor._
|
||||
import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit}
|
||||
import se.scalablesolutions.akka.util.Duration
|
||||
import se.scalablesolutions.akka.dispatch.{MessageQueueAppendFailedException, MessageInvocation, MailboxConfig, Dispatchers}
|
||||
import java.util.concurrent.atomic.{AtomicReference}
|
||||
|
||||
object MailboxConfigSpec {
|
||||
|
||||
}
|
||||
|
||||
class MailboxConfigSpec extends JUnitSuite {
|
||||
import MailboxConfigSpec._
|
||||
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
@Test def shouldCreateUnboundedQueue = {
|
||||
val m = MailboxConfig(-1,None,false)
|
||||
assert(m.newMailbox().asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === Integer.MAX_VALUE)
|
||||
}
|
||||
|
||||
@Test def shouldCreateBoundedQueue = {
|
||||
val m = MailboxConfig(1,None,false)
|
||||
assert(m.newMailbox().asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === 1)
|
||||
}
|
||||
|
||||
@Test(expected = classOf[MessageQueueAppendFailedException]) def shouldThrowMessageQueueAppendFailedExceptionWhenTimeOutEnqueue = {
|
||||
val m = MailboxConfig(1,Some(Duration(1,unit)),false)
|
||||
val testActor = actorOf( new Actor { def receive = { case _ => }} )
|
||||
val mbox = m.newMailbox()
|
||||
(1 to 10000) foreach { i => mbox.enqueue(new MessageInvocation(testActor,i,None,None,None)) }
|
||||
}
|
||||
|
||||
|
||||
@Test def shouldBeAbleToDequeueUnblocking = {
|
||||
val m = MailboxConfig(1,Some(Duration(1,unit)),false)
|
||||
val mbox = m.newMailbox()
|
||||
val latch = new CountDownLatch(1)
|
||||
val t = new Thread { override def run = {
|
||||
mbox.dequeue
|
||||
latch.countDown
|
||||
}}
|
||||
t.start
|
||||
val result = latch.await(5000,unit)
|
||||
if (!result)
|
||||
t.interrupt
|
||||
assert(result === true)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue