diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index d36af50ad6..1203d32fde 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -5,6 +5,7 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} +import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule import java.util.Queue import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} @@ -84,77 +85,25 @@ class ExecutorBasedEventDrivenDispatcher( val mailboxType = Some(_mailboxType) - @volatile private var active = false + @volatile private[akka] var active = false val name = "akka:event-driven:dispatcher:" + _name init - /** - * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox - */ - trait ExecutableMailbox extends Runnable { self: MessageQueue => - final def run = { - val reschedule = try { - processMailbox() - } finally { - dispatcherLock.unlock() - } - if (reschedule || !self.isEmpty) registerForExecution(self) - } - - /** - * Process the messages in the mailbox - * - * @return true if the processing finished before the mailbox was empty, due to the throughput constraint - */ - final def processMailbox(): Boolean = { - var nextMessage = self.dequeue - if (nextMessage ne null) { - val throttle = throughput > 0 - var processedMessages = 0 - val isDeadlineEnabled = throttle && throughputDeadlineTime > 0 - val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 - do { - nextMessage.invoke - - if (nextMessage.receiver.isBeingRestarted) - return !self.isEmpty - - if (throttle) { // Will be elided when false - processedMessages += 1 - if ((processedMessages >= throughput) || - (isDeadlineEnabled && (System.currentTimeMillis - started) >= throughputDeadlineTime)) // If we're throttled, break out - return !self.isEmpty - } - nextMessage = self.dequeue - } while (nextMessage ne null) - } - false - } - } - def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation - registerForExecution(mbox) + mbox.registerForExecution } - def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { - if (mailbox.dispatcherLock.tryLock()) { - try { - executor execute mailbox - } catch { - case e: RejectedExecutionException => - mailbox.dispatcherLock.unlock() - throw e - } - } - } else log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox) - /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] + private def getMailbox(receiver: ActorRef) = { + val mb = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] + mb.register(this) + mb + } override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size @@ -166,6 +115,19 @@ class ExecutorBasedEventDrivenDispatcher( new DefaultBoundedMessageQueue(cap, pushTimeOut, blocking) with ExecutableMailbox } + /** + * Creates and returns a durable mailbox for the given actor. + */ + def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { + // FIXME make generic (work for TypedActor as well) + case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case ZooKeeperBasedDurableMailbox(serializer) => EnterpriseModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case BeanstalkBasedDurableMailbox(serializer) => EnterpriseModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case RedisBasedDurableMailbox(serializer) => EnterpriseModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported") + case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") + } + def start = if (!active) { log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) active = true @@ -191,4 +153,69 @@ class ExecutorBasedEventDrivenDispatcher( config(this) buildThreadPool } -} \ No newline at end of file +} + +/** + * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox. + */ +trait ExecutableMailbox extends Runnable { self: MessageQueue => + + private var _dispatcher: Option[ExecutorBasedEventDrivenDispatcher] = None + + def register(md: ExecutorBasedEventDrivenDispatcher) = _dispatcher = Some(md) + def dispatcher: ExecutorBasedEventDrivenDispatcher = _dispatcher.getOrElse( + throw new IllegalActorStateException("mailbox.register(dispatcher) has not been invoked")) + + final def run = { + val reschedule = try { + processMailbox() + } finally { + dispatcherLock.unlock() + } + if (reschedule || !self.isEmpty) registerForExecution + } + + /** + * Process the messages in the mailbox + * + * @return true if the processing finished before the mailbox was empty, due to the throughput constraint + */ + final def processMailbox(): Boolean = { + var nextMessage = self.dequeue + if (nextMessage ne null) { + val throttle = dispatcher.throughput > 0 + var processedMessages = 0 + val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0 + val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 + do { + nextMessage.invoke + + if (nextMessage.receiver.isBeingRestarted) + return !self.isEmpty + + if (throttle) { // Will be elided when false + processedMessages += 1 + if ((processedMessages >= dispatcher.throughput) || + (isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out + return !self.isEmpty + } + nextMessage = self.dequeue + } while (nextMessage ne null) + } + false + } + + + def registerForExecution: Unit = if (dispatcher.active) { + if (dispatcherLock.tryLock()) { + try { + dispatcher.execute(this) + } catch { + case e: RejectedExecutionException => + dispatcherLock.unlock() + throw e + } + } + } else dispatcher.log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, this) +} + diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index a5ed113b97..eb949958c9 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -215,6 +215,19 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( } } + /** + * Creates and returns a durable mailbox for the given actor. + */ + protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { + // FIXME make generic (work for TypedActor as well) + case FileBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("FileBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") + case ZooKeeperBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("ZooKeeperBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") + case BeanstalkBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("BeanstalkBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") + case RedisBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("RedisBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") + case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") + case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher") + } + override def register(actorRef: ActorRef) = { verifyActorsAreOfSameType(actorRef) pooledActors add actorRef diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala index 549765d9fe..cb96b39a3b 100644 --- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala @@ -172,7 +172,12 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef] - override def toString = "HawtDispatchEventDrivenDispatcher" + /** + * Creates and returns a durable mailbox for the given actor. + */ + protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = null.asInstanceOf[AnyRef] + + override def toString = "HawtDispatcher" } class HawtDispatcherMailbox(val queue: DispatchQueue) { diff --git a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala index 360c8a143f..192313f178 100644 --- a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala @@ -101,15 +101,7 @@ trait MailboxFactory { protected def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef /** - * Creates and returns a durable mailbox for the given actor. + * Creates and returns a durable mailbox for the given actor. */ - protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { - // FIXME make generic (work for TypedActor as well) - case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case ZooKeeperBasedDurableMailbox(serializer) => EnterpriseModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case BeanstalkBasedDurableMailbox(serializer) => EnterpriseModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case RedisBasedDurableMailbox(serializer) => EnterpriseModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue] - case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported") - case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") - } + protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef } \ No newline at end of file diff --git a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala index 7d886c7fb0..51c8d2bd73 100644 --- a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala @@ -18,9 +18,6 @@ import org.springframework.core.io.{ClassPathResource, Resource} import java.util.concurrent._ import se.scalablesolutions.akka.actor.{UntypedActor, Actor, ActorRef} - - - /** * Tests for spring configuration of typed actors. * @author michaelkober @@ -109,7 +106,7 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml") val dispatcher = context.getBean("hawt-dispatcher").asInstanceOf[HawtDispatcher] assert(dispatcher ne null) - assert(dispatcher.toString === "HawtDispatchEventDrivenDispatcher") + assert(dispatcher.toString === "HawtDispatcher") assert(dispatcher.aggregate === false) }