Refactored ExecutableMailbox to make it accessible for other implementations

This commit is contained in:
Jonas Bonér 2010-09-29 10:30:05 +02:00
parent bd95d20edb
commit 5e0dfea15c
5 changed files with 109 additions and 75 deletions

View file

@ -5,6 +5,7 @@
package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule
import java.util.Queue
import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
@ -84,77 +85,25 @@ class ExecutorBasedEventDrivenDispatcher(
val mailboxType = Some(_mailboxType)
@volatile private var active = false
@volatile private[akka] var active = false
val name = "akka:event-driven:dispatcher:" + _name
init
/**
* This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox
*/
trait ExecutableMailbox extends Runnable { self: MessageQueue =>
final def run = {
val reschedule = try {
processMailbox()
} finally {
dispatcherLock.unlock()
}
if (reschedule || !self.isEmpty) registerForExecution(self)
}
/**
* Process the messages in the mailbox
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox(): Boolean = {
var nextMessage = self.dequeue
if (nextMessage ne null) {
val throttle = throughput > 0
var processedMessages = 0
val isDeadlineEnabled = throttle && throughputDeadlineTime > 0
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
do {
nextMessage.invoke
if (nextMessage.receiver.isBeingRestarted)
return !self.isEmpty
if (throttle) { // Will be elided when false
processedMessages += 1
if ((processedMessages >= throughput) ||
(isDeadlineEnabled && (System.currentTimeMillis - started) >= throughputDeadlineTime)) // If we're throttled, break out
return !self.isEmpty
}
nextMessage = self.dequeue
} while (nextMessage ne null)
}
false
}
}
def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
registerForExecution(mbox)
mbox.registerForExecution
}
def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) {
if (mailbox.dispatcherLock.tryLock()) {
try {
executor execute mailbox
} catch {
case e: RejectedExecutionException =>
mailbox.dispatcherLock.unlock()
throw e
}
}
} else log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox)
/**
* @return the mailbox associated with the actor
*/
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
private def getMailbox(receiver: ActorRef) = {
val mb = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
mb.register(this)
mb
}
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
@ -166,6 +115,19 @@ class ExecutorBasedEventDrivenDispatcher(
new DefaultBoundedMessageQueue(cap, pushTimeOut, blocking) with ExecutableMailbox
}
/**
* Creates and returns a durable mailbox for the given actor.
*/
def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match {
// FIXME make generic (work for TypedActor as well)
case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case ZooKeeperBasedDurableMailbox(serializer) => EnterpriseModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case BeanstalkBasedDurableMailbox(serializer) => EnterpriseModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case RedisBasedDurableMailbox(serializer) => EnterpriseModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported")
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
}
def start = if (!active) {
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
active = true
@ -191,4 +153,69 @@ class ExecutorBasedEventDrivenDispatcher(
config(this)
buildThreadPool
}
}
}
/**
* This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox.
*/
trait ExecutableMailbox extends Runnable { self: MessageQueue =>
private var _dispatcher: Option[ExecutorBasedEventDrivenDispatcher] = None
def register(md: ExecutorBasedEventDrivenDispatcher) = _dispatcher = Some(md)
def dispatcher: ExecutorBasedEventDrivenDispatcher = _dispatcher.getOrElse(
throw new IllegalActorStateException("mailbox.register(dispatcher) has not been invoked"))
final def run = {
val reschedule = try {
processMailbox()
} finally {
dispatcherLock.unlock()
}
if (reschedule || !self.isEmpty) registerForExecution
}
/**
* Process the messages in the mailbox
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox(): Boolean = {
var nextMessage = self.dequeue
if (nextMessage ne null) {
val throttle = dispatcher.throughput > 0
var processedMessages = 0
val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
do {
nextMessage.invoke
if (nextMessage.receiver.isBeingRestarted)
return !self.isEmpty
if (throttle) { // Will be elided when false
processedMessages += 1
if ((processedMessages >= dispatcher.throughput) ||
(isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out
return !self.isEmpty
}
nextMessage = self.dequeue
} while (nextMessage ne null)
}
false
}
def registerForExecution: Unit = if (dispatcher.active) {
if (dispatcherLock.tryLock()) {
try {
dispatcher.execute(this)
} catch {
case e: RejectedExecutionException =>
dispatcherLock.unlock()
throw e
}
}
} else dispatcher.log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, this)
}