pekko/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala

225 lines
8.5 KiB
Scala
Raw Normal View History

/**
2009-12-27 16:01:53 +01:00
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule
import java.util.Queue
2010-10-11 12:42:02 +02:00
import se.scalablesolutions.akka.util.Switch
2010-10-22 17:50:48 +02:00
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent. {ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
/**
* Default settings are:
* <pre/>
* - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
* - NR_START_THREADS = 16
* - NR_MAX_THREADS = 128
* - KEEP_ALIVE_TIME = 60000L // one minute
* </pre>
* <p/>
*
* The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
* There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
* <p/>
*
* Scala API.
* <p/>
* Example usage:
* <pre/>
* val dispatcher = new ExecutorBasedEventDrivenDispatcher("name")
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy)
* .buildThreadPool
* </pre>
* <p/>
*
* Java API.
* <p/>
* Example usage:
* <pre/>
* ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name");
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy())
* .buildThreadPool();
* </pre>
* <p/>
*
* But the preferred way of creating dispatchers is to use
2010-03-07 09:44:03 +01:00
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
* mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
* always continues until the mailbox is empty.
2010-06-30 16:26:15 +02:00
* Larger values (or zero or negative) increase througput, smaller values increase fairness
*/
class ExecutorBasedEventDrivenDispatcher(
2010-08-21 16:13:16 +02:00
_name: String,
2010-09-09 17:34:05 +02:00
val throughput: Int = Dispatchers.THROUGHPUT,
2010-09-21 18:52:41 +02:00
val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
2010-10-22 17:50:48 +02:00
val config: ThreadPoolConfig = ThreadPoolConfig())
extends MessageDispatcher {
2010-09-21 18:52:41 +02:00
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
2010-10-22 17:50:48 +02:00
this(_name, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
2010-09-04 12:00:12 +02:00
2010-09-21 18:52:41 +02:00
def this(_name: String, throughput: Int, mailboxType: MailboxType) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
2010-08-21 16:13:16 +02:00
2010-09-21 18:52:41 +02:00
def this(_name: String, throughput: Int) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(_name: String, _config: ThreadPoolConfig) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
2010-09-21 18:52:41 +02:00
def this(_name: String) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
2010-10-22 17:50:48 +02:00
val name = "akka:event-driven:dispatcher:" + _name
2010-09-21 18:52:41 +02:00
val mailboxType = Some(_mailboxType)
2010-10-22 17:50:48 +02:00
private[akka] val threadFactory = new MonitorableThreadFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
private[akka] def dispatch(invocation: MessageInvocation) = {
2010-09-11 15:24:09 +02:00
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
registerForExecution(mbox)
2010-09-11 15:24:09 +02:00
}
/**
* @return the mailbox associated with the actor
*/
2010-10-11 12:42:02 +02:00
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
2010-09-11 15:24:09 +02:00
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
2010-09-21 18:52:41 +02:00
def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match {
2010-10-11 12:42:02 +02:00
case UnboundedMailbox(blocking) => new DefaultUnboundedMessageQueue(blocking) with ExecutableMailbox {
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
2010-09-21 18:52:41 +02:00
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
2010-10-22 17:50:48 +02:00
new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) with ExecutableMailbox {
2010-10-11 12:42:02 +02:00
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
2010-09-12 11:24:53 +02:00
}
2010-09-11 15:24:09 +02:00
/**
* Creates and returns a durable mailbox for the given actor.
*/
def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match {
// FIXME make generic (work for TypedActor as well)
case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case ZooKeeperBasedDurableMailbox(serializer) => EnterpriseModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case BeanstalkBasedDurableMailbox(serializer) => EnterpriseModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case RedisBasedDurableMailbox(serializer) => EnterpriseModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported")
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
}
private[akka] def start = log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
private[akka] def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
2010-10-22 17:50:48 +02:00
if (old ne null) {
log.debug("Shutting down %s", toString)
old.shutdownNow()
}
2010-09-12 11:24:53 +02:00
}
2010-07-29 17:29:51 +02:00
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
2010-10-11 16:11:51 +02:00
if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) {
try {
2010-10-22 17:50:48 +02:00
executorService.get() execute mbox
} catch {
case e: RejectedExecutionException =>
mbox.dispatcherLock.unlock()
throw e
}
}
} else log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", this, mbox)
2010-10-11 12:42:02 +02:00
override val toString = getClass.getSimpleName + "[" + name + "]"
2010-10-11 16:11:51 +02:00
def suspend(actorRef: ActorRef) {
log.debug("Suspending %s",actorRef.uuid)
getMailbox(actorRef).suspended.switchOn
}
def resume(actorRef: ActorRef) {
log.debug("Resuming %s",actorRef.uuid)
val mbox = getMailbox(actorRef)
mbox.suspended.switchOff
registerForExecution(mbox)
}
}
/**
* This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox.
*/
trait ExecutableMailbox extends Runnable { self: MessageQueue =>
2010-10-11 12:42:02 +02:00
def dispatcher: ExecutorBasedEventDrivenDispatcher
final def run = {
val reschedule = try {
try { processMailbox() } catch { case ie: InterruptedException => true }
} finally {
dispatcherLock.unlock()
}
if (reschedule || !self.isEmpty)
dispatcher.registerForExecution(this)
}
/**
* Process the messages in the mailbox
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox(): Boolean = {
2010-10-11 16:11:51 +02:00
if (self.suspended.isOn)
true
else {
var nextMessage = self.dequeue
if (nextMessage ne null) {
val throttle = dispatcher.throughput > 0
var processedMessages = 0
val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
do {
nextMessage.invoke
if (throttle) { // Will be elided when false
processedMessages += 1
if ((processedMessages >= dispatcher.throughput) ||
(isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out
return !self.isEmpty
}
if (self.suspended.isOn)
return true
nextMessage = self.dequeue
} while (nextMessage ne null)
}
false
}
}
}