diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 4962962f09..dbbfe3442a 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -72,7 +72,7 @@ class ExecutorBasedEventDrivenDispatcher( def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage - + //FIXME remove this from ThreadPoolBuilder mailboxCapacity = mailboxConfig.capacity @volatile private var active: Boolean = false @@ -81,18 +81,18 @@ class ExecutorBasedEventDrivenDispatcher( init def dispatch(invocation: MessageInvocation) = { - getMailbox(invocation.receiver).add(invocation) + getMailbox(invocation.receiver) enqueue invocation dispatch(invocation.receiver) } /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Queue[MessageInvocation]] + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity) + override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity, blockDequeue = false) def dispatch(receiver: ActorRef): Unit = if (active) { @@ -131,12 +131,12 @@ class ExecutorBasedEventDrivenDispatcher( def processMailbox(receiver: ActorRef): Boolean = { var processedMessages = 0 val mailbox = getMailbox(receiver) - var messageInvocation = mailbox.poll + var messageInvocation = mailbox.dequeue while (messageInvocation != null) { messageInvocation.invoke processedMessages += 1 // check if we simply continue with other messages, or reached the throughput limit - if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.poll + if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.dequeue else { messageInvocation = null return !mailbox.isEmpty diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 26ca78d379..383c58905a 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -65,6 +65,8 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m trait MessageQueue { def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation + def size: Int + def isEmpty: Boolean } /* Tells the dispatcher that it should create a bounded mailbox with the specified push timeout diff --git a/akka-actor/src/test/scala/dispatch/MailboxConfigSpec.scala b/akka-actor/src/test/scala/dispatch/MailboxConfigSpec.scala new file mode 100644 index 0000000000..27afdbbce6 --- /dev/null +++ b/akka-actor/src/test/scala/dispatch/MailboxConfigSpec.scala @@ -0,0 +1,53 @@ +package se.scalablesolutions.akka.actor.dispatch + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import se.scalablesolutions.akka.actor.Actor +import Actor._ +import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit} +import se.scalablesolutions.akka.util.Duration +import se.scalablesolutions.akka.dispatch.{MessageQueueAppendFailedException, MessageInvocation, MailboxConfig, Dispatchers} +import java.util.concurrent.atomic.{AtomicReference} + +object MailboxConfigSpec { + +} + +class MailboxConfigSpec extends JUnitSuite { + import MailboxConfigSpec._ + + private val unit = TimeUnit.MILLISECONDS + + @Test def shouldCreateUnboundedQueue = { + val m = MailboxConfig(-1,None,false) + assert(m.newMailbox().asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === Integer.MAX_VALUE) + } + + @Test def shouldCreateBoundedQueue = { + val m = MailboxConfig(1,None,false) + assert(m.newMailbox().asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === 1) + } + + @Test(expected = classOf[MessageQueueAppendFailedException]) def shouldThrowMessageQueueAppendFailedExceptionWhenTimeOutEnqueue = { + val m = MailboxConfig(1,Some(Duration(1,unit)),false) + val testActor = actorOf( new Actor { def receive = { case _ => }} ) + val mbox = m.newMailbox() + (1 to 10000) foreach { i => mbox.enqueue(new MessageInvocation(testActor,i,None,None,None)) } + } + + + @Test def shouldBeAbleToDequeueUnblocking = { + val m = MailboxConfig(1,Some(Duration(1,unit)),false) + val mbox = m.newMailbox() + val latch = new CountDownLatch(1) + val t = new Thread { override def run = { + mbox.dequeue + latch.countDown + }} + t.start + val result = latch.await(5000,unit) + if (!result) + t.interrupt + assert(result === true) + } +}