pekko/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala

168 lines
5.7 KiB
Scala
Raw Normal View History

/**
2009-12-27 16:01:53 +01:00
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
import java.util.Queue
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
/**
* Default settings are:
* <pre/>
* - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
* - NR_START_THREADS = 16
* - NR_MAX_THREADS = 128
* - KEEP_ALIVE_TIME = 60000L // one minute
* </pre>
* <p/>
*
* The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
* There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
* <p/>
*
* Scala API.
* <p/>
* Example usage:
* <pre/>
* val dispatcher = new ExecutorBasedEventDrivenDispatcher("name")
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy)
* .buildThreadPool
* </pre>
* <p/>
*
* Java API.
* <p/>
* Example usage:
* <pre/>
* ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name");
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy())
* .buildThreadPool();
* </pre>
* <p/>
*
* But the preferred way of creating dispatchers is to use
2010-03-07 09:44:03 +01:00
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
* mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
* always continues until the mailbox is empty.
2010-06-30 16:26:15 +02:00
* Larger values (or zero or negative) increase througput, smaller values increase fairness
*/
class ExecutorBasedEventDrivenDispatcher(
2010-08-21 16:13:16 +02:00
_name: String,
throughput: Int = Dispatchers.THROUGHPUT,
capacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage
mailboxCapacity = capacity
2010-08-21 16:13:16 +02:00
@volatile private var active: Boolean = false
val name = "akka:event-driven:dispatcher:" + _name
init
def dispatch(invocation: MessageInvocation) = {
getMailbox(invocation.receiver).add(invocation)
dispatch(invocation.receiver)
}
/**
* @return the mailbox associated with the actor
*/
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Queue[MessageInvocation]]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
override def createMailbox(actorRef: ActorRef): AnyRef = {
if (mailboxCapacity <= 0) new ConcurrentLinkedQueue[MessageInvocation]
else new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
}
def dispatch(receiver: ActorRef): Unit = if (active) {
executor.execute(new Runnable() {
def run = {
var lockAcquiredOnce = false
var finishedBeforeMailboxEmpty = false
val lock = receiver.dispatcherLock
val mailbox = getMailbox(receiver)
// this do-while loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
do {
2010-04-30 20:22:45 +02:00
if (lock.tryLock) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
lockAcquiredOnce = true
try {
finishedBeforeMailboxEmpty = processMailbox(receiver)
} finally {
2010-04-30 20:22:45 +02:00
lock.unlock
if (finishedBeforeMailboxEmpty) dispatch(receiver)
}
}
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty))
2009-12-15 20:38:53 +01:00
}
})
} else {
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver)
}
/**
* Process the messages in the mailbox of the given actor.
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
def processMailbox(receiver: ActorRef): Boolean = {
var processedMessages = 0
val mailbox = getMailbox(receiver)
var messageInvocation = mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
processedMessages += 1
// check if we simply continue with other messages, or reached the throughput limit
if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.poll
else {
messageInvocation = null
return !mailbox.isEmpty
}
}
false
}
def start = if (!active) {
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
active = true
}
def shutdown = if (active) {
log.debug("Shutting down %s", toString)
executor.shutdownNow
active = false
2010-08-27 12:12:33 +02:00
uuids.clear
}
2010-05-21 20:08:49 +02:00
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
2010-07-29 17:29:51 +02:00
override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]"
// FIXME: should we have an unbounded queue and not bounded as default ????
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
}