2009-12-11 16:37:44 +01:00
|
|
|
/**
|
2010-12-22 15:35:50 +01:00
|
|
|
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
2009-12-11 16:37:44 +01:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.dispatch
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2011-03-02 18:19:17 +01:00
|
|
|
import akka.actor.{ActorRef, IllegalActorStateException, EventHandler}
|
2010-11-22 17:58:21 +01:00
|
|
|
import akka.util.{ReflectiveAccess, Switch}
|
2010-08-21 10:45:00 +02:00
|
|
|
|
|
|
|
|
import java.util.Queue
|
2010-10-22 17:50:48 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicReference
|
2011-02-14 02:34:40 +01:00
|
|
|
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
|
2010-05-27 15:50:11 +12:00
|
|
|
|
2009-12-11 16:37:44 +01:00
|
|
|
/**
|
|
|
|
|
* Default settings are:
|
|
|
|
|
* <pre/>
|
|
|
|
|
* - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
|
|
|
|
* - NR_START_THREADS = 16
|
|
|
|
|
* - NR_MAX_THREADS = 128
|
|
|
|
|
* - KEEP_ALIVE_TIME = 60000L // one minute
|
|
|
|
|
* </pre>
|
|
|
|
|
* <p/>
|
|
|
|
|
*
|
|
|
|
|
* The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
|
|
|
|
|
* There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
|
|
|
|
|
* <p/>
|
|
|
|
|
*
|
|
|
|
|
* Scala API.
|
|
|
|
|
* <p/>
|
|
|
|
|
* Example usage:
|
|
|
|
|
* <pre/>
|
|
|
|
|
* val dispatcher = new ExecutorBasedEventDrivenDispatcher("name")
|
|
|
|
|
* dispatcher
|
|
|
|
|
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
|
|
|
|
* .setCorePoolSize(16)
|
|
|
|
|
* .setMaxPoolSize(128)
|
|
|
|
|
* .setKeepAliveTimeInMillis(60000)
|
|
|
|
|
* .setRejectionPolicy(new CallerRunsPolicy)
|
|
|
|
|
* .buildThreadPool
|
|
|
|
|
* </pre>
|
|
|
|
|
* <p/>
|
|
|
|
|
*
|
|
|
|
|
* Java API.
|
|
|
|
|
* <p/>
|
|
|
|
|
* Example usage:
|
|
|
|
|
* <pre/>
|
|
|
|
|
* ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name");
|
|
|
|
|
* dispatcher
|
|
|
|
|
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
|
|
|
|
* .setCorePoolSize(16)
|
|
|
|
|
* .setMaxPoolSize(128)
|
|
|
|
|
* .setKeepAliveTimeInMillis(60000)
|
|
|
|
|
* .setRejectionPolicy(new CallerRunsPolicy())
|
|
|
|
|
* .buildThreadPool();
|
|
|
|
|
* </pre>
|
|
|
|
|
* <p/>
|
|
|
|
|
*
|
|
|
|
|
* But the preferred way of creating dispatchers is to use
|
2010-10-26 12:49:25 +02:00
|
|
|
* the {@link akka.dispatch.Dispatchers} factory object.
|
2009-12-11 16:37:44 +01:00
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2010-06-02 16:56:36 +02:00
|
|
|
* @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
|
|
|
|
|
* mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
|
|
|
|
|
* always continues until the mailbox is empty.
|
2010-06-30 16:26:15 +02:00
|
|
|
* Larger values (or zero or negative) increase througput, smaller values increase fairness
|
2009-12-11 16:37:44 +01:00
|
|
|
*/
|
2010-08-21 15:36:50 +02:00
|
|
|
class ExecutorBasedEventDrivenDispatcher(
|
2010-08-21 16:13:16 +02:00
|
|
|
_name: String,
|
2010-09-09 17:34:05 +02:00
|
|
|
val throughput: Int = Dispatchers.THROUGHPUT,
|
2010-09-21 18:52:41 +02:00
|
|
|
val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
2011-01-20 17:16:44 +01:00
|
|
|
val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
2010-10-22 17:50:48 +02:00
|
|
|
val config: ThreadPoolConfig = ThreadPoolConfig())
|
|
|
|
|
extends MessageDispatcher {
|
2010-08-21 15:36:50 +02:00
|
|
|
|
2010-10-29 16:33:31 +02:00
|
|
|
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
|
2010-10-22 17:50:48 +02:00
|
|
|
this(_name, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
|
2010-09-04 12:00:12 +02:00
|
|
|
|
2010-10-29 16:33:31 +02:00
|
|
|
def this(_name: String, throughput: Int, mailboxType: MailboxType) =
|
2010-09-21 18:52:41 +02:00
|
|
|
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
|
2010-08-21 16:13:16 +02:00
|
|
|
|
2010-10-29 16:33:31 +02:00
|
|
|
def this(_name: String, throughput: Int) =
|
2010-09-21 18:52:41 +02:00
|
|
|
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
|
|
|
|
|
2010-10-24 00:36:56 +02:00
|
|
|
def this(_name: String, _config: ThreadPoolConfig) =
|
|
|
|
|
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
|
|
|
|
|
|
2010-10-29 16:33:31 +02:00
|
|
|
def this(_name: String) =
|
2010-09-21 18:52:41 +02:00
|
|
|
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
|
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
val name = "akka:event-driven:dispatcher:" + _name
|
2010-09-21 18:52:41 +02:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
private[akka] val threadFactory = new MonitorableThreadFactory(name)
|
2010-10-24 00:36:56 +02:00
|
|
|
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
|
2010-03-04 21:22:16 +01:00
|
|
|
|
2010-10-25 12:51:13 +02:00
|
|
|
private[akka] def dispatch(invocation: MessageInvocation) = {
|
2010-09-11 15:24:09 +02:00
|
|
|
val mbox = getMailbox(invocation.receiver)
|
|
|
|
|
mbox enqueue invocation
|
2010-10-11 14:02:46 +02:00
|
|
|
registerForExecution(mbox)
|
2010-09-11 15:24:09 +02:00
|
|
|
}
|
|
|
|
|
|
2011-02-25 15:20:58 -07:00
|
|
|
private[akka] def executeFuture(invocation: FutureInvocation): Unit = if (active.isOn) {
|
2011-03-02 16:44:44 -07:00
|
|
|
try executorService.get() execute invocation
|
|
|
|
|
catch {
|
|
|
|
|
case e: RejectedExecutionException =>
|
2011-03-07 19:17:45 -07:00
|
|
|
EventHandler notifyListeners EventHandler.Warning(this, e.toString)
|
2011-03-02 16:44:44 -07:00
|
|
|
throw e
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-02-25 15:20:58 -07:00
|
|
|
|
2010-09-11 15:24:09 +02:00
|
|
|
/**
|
|
|
|
|
* @return the mailbox associated with the actor
|
|
|
|
|
*/
|
2011-02-27 22:44:37 +01:00
|
|
|
protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
|
2010-09-11 15:24:09 +02:00
|
|
|
|
|
|
|
|
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
|
|
|
|
|
|
2011-01-20 17:16:44 +01:00
|
|
|
def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
|
2010-10-11 12:42:02 +02:00
|
|
|
case UnboundedMailbox(blocking) => new DefaultUnboundedMessageQueue(blocking) with ExecutableMailbox {
|
|
|
|
|
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
|
|
|
|
|
}
|
2010-10-29 16:33:31 +02:00
|
|
|
|
2010-09-21 18:52:41 +02:00
|
|
|
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
|
2010-10-22 17:50:48 +02:00
|
|
|
new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) with ExecutableMailbox {
|
2010-10-11 12:42:02 +02:00
|
|
|
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
|
|
|
|
|
}
|
2010-09-12 11:24:53 +02:00
|
|
|
}
|
2010-09-11 15:24:09 +02:00
|
|
|
|
2011-02-27 23:17:59 +01:00
|
|
|
private[akka] def start {}
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-25 12:51:13 +02:00
|
|
|
private[akka] def shutdown {
|
2010-10-24 00:36:56 +02:00
|
|
|
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
|
2010-10-22 17:50:48 +02:00
|
|
|
if (old ne null) {
|
|
|
|
|
old.shutdownNow()
|
|
|
|
|
}
|
2010-09-12 11:24:53 +02:00
|
|
|
}
|
2010-07-29 17:29:51 +02:00
|
|
|
|
2011-02-27 23:44:04 +01:00
|
|
|
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
|
|
|
|
|
if (mbox.dispatcherLock.tryLock()) {
|
|
|
|
|
if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended
|
|
|
|
|
try {
|
|
|
|
|
executorService.get() execute mbox
|
|
|
|
|
} catch {
|
|
|
|
|
case e: RejectedExecutionException =>
|
2011-03-07 12:18:00 +01:00
|
|
|
EventHandler notifyListeners EventHandler.Warning(this, e.toString)
|
2011-02-27 23:44:04 +01:00
|
|
|
mbox.dispatcherLock.unlock()
|
|
|
|
|
throw e
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock
|
2010-10-11 14:02:46 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-02-28 22:54:32 +01:00
|
|
|
}
|
2010-10-11 12:42:02 +02:00
|
|
|
|
2011-02-27 22:44:37 +01:00
|
|
|
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
|
|
|
|
|
registerForExecution(mbox)
|
2010-10-11 12:42:02 +02:00
|
|
|
|
|
|
|
|
override val toString = getClass.getSimpleName + "[" + name + "]"
|
2010-01-02 08:40:09 +01:00
|
|
|
|
2010-10-11 16:11:51 +02:00
|
|
|
def suspend(actorRef: ActorRef) {
|
2011-02-14 02:34:40 +01:00
|
|
|
getMailbox(actorRef).suspended.tryLock
|
2010-10-11 16:11:51 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def resume(actorRef: ActorRef) {
|
|
|
|
|
val mbox = getMailbox(actorRef)
|
2011-02-14 02:40:57 +01:00
|
|
|
mbox.suspended.tryUnlock
|
2011-02-27 22:44:37 +01:00
|
|
|
reRegisterForExecution(mbox)
|
2010-10-11 16:11:51 +02:00
|
|
|
}
|
2010-09-29 10:30:05 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox.
|
|
|
|
|
*/
|
|
|
|
|
trait ExecutableMailbox extends Runnable { self: MessageQueue =>
|
|
|
|
|
|
2010-10-11 12:42:02 +02:00
|
|
|
def dispatcher: ExecutorBasedEventDrivenDispatcher
|
2010-10-29 16:33:31 +02:00
|
|
|
|
2010-09-29 10:30:05 +02:00
|
|
|
final def run = {
|
2011-02-14 02:34:40 +01:00
|
|
|
try {
|
2011-02-14 02:51:26 +01:00
|
|
|
processMailbox()
|
|
|
|
|
} catch {
|
|
|
|
|
case ie: InterruptedException =>
|
2010-09-29 10:30:05 +02:00
|
|
|
} finally {
|
|
|
|
|
dispatcherLock.unlock()
|
|
|
|
|
}
|
2011-02-14 02:34:40 +01:00
|
|
|
if (!self.isEmpty)
|
2011-02-27 22:44:37 +01:00
|
|
|
dispatcher.reRegisterForExecution(this)
|
2010-09-29 10:30:05 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Process the messages in the mailbox
|
|
|
|
|
*
|
|
|
|
|
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
|
|
|
|
|
*/
|
2011-02-14 02:34:40 +01:00
|
|
|
final def processMailbox() {
|
|
|
|
|
if (!self.suspended.locked) {
|
2010-10-11 16:11:51 +02:00
|
|
|
var nextMessage = self.dequeue
|
2011-02-14 02:34:40 +01:00
|
|
|
if (nextMessage ne null) { //If we have a message
|
|
|
|
|
if (dispatcher.throughput <= 1) //If we only run one message per process
|
|
|
|
|
nextMessage.invoke //Just run it
|
|
|
|
|
else { //But otherwise, if we are throttled, we need to do some book-keeping
|
|
|
|
|
var processedMessages = 0
|
|
|
|
|
val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
|
|
|
|
|
val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0
|
|
|
|
|
do {
|
|
|
|
|
nextMessage.invoke
|
|
|
|
|
|
|
|
|
|
nextMessage =
|
|
|
|
|
if (self.suspended.locked) {
|
|
|
|
|
null //If we are suspended, abort
|
|
|
|
|
}
|
2011-02-14 02:51:26 +01:00
|
|
|
else { //If we aren't suspended, we need to make sure we're not overstepping our boundaries
|
2011-02-14 02:34:40 +01:00
|
|
|
processedMessages += 1
|
|
|
|
|
if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
|
2011-02-14 02:51:26 +01:00
|
|
|
null //We reached our boundaries, abort
|
2011-02-14 02:34:40 +01:00
|
|
|
else
|
|
|
|
|
self.dequeue //Dequeue the next message
|
|
|
|
|
}
|
|
|
|
|
} while (nextMessage ne null)
|
|
|
|
|
}
|
2010-10-11 16:11:51 +02:00
|
|
|
}
|
2010-09-29 10:30:05 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-03-09 18:20:48 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox,
|
|
|
|
|
* prioritized according to the supplied comparator.
|
|
|
|
|
*/
|
|
|
|
|
class PriorityExecutorBasedEventDrivenDispatcher(
|
|
|
|
|
name: String,
|
2011-03-10 12:03:57 +01:00
|
|
|
val comparator: java.util.Comparator[MessageInvocation],
|
2011-03-09 18:20:48 +01:00
|
|
|
throughput: Int = Dispatchers.THROUGHPUT,
|
|
|
|
|
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
2011-03-11 14:51:24 +01:00
|
|
|
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
2011-03-09 18:20:48 +01:00
|
|
|
config: ThreadPoolConfig = ThreadPoolConfig()
|
2011-03-10 12:03:57 +01:00
|
|
|
) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
|
2011-03-09 18:20:48 +01:00
|
|
|
|
2011-03-10 12:03:57 +01:00
|
|
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: UnboundedMailbox) =
|
2011-03-09 18:20:48 +01:00
|
|
|
this(name, comparator, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
|
|
|
|
|
|
2011-03-10 12:03:57 +01:00
|
|
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: UnboundedMailbox) =
|
2011-03-09 18:20:48 +01:00
|
|
|
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
|
|
|
|
|
|
|
|
|
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
|
2011-03-11 14:51:24 +01:00
|
|
|
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
2011-03-09 18:20:48 +01:00
|
|
|
|
|
|
|
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) =
|
2011-03-11 14:51:24 +01:00
|
|
|
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config)
|
2011-03-09 18:20:48 +01:00
|
|
|
|
|
|
|
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
|
2011-03-11 14:51:24 +01:00
|
|
|
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
2011-03-10 12:03:57 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
|
|
|
|
|
def comparator: java.util.Comparator[MessageInvocation]
|
2011-03-09 18:20:48 +01:00
|
|
|
|
2011-03-10 12:03:57 +01:00
|
|
|
override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match {
|
2011-03-09 18:20:48 +01:00
|
|
|
case UnboundedMailbox(blocking) => new UnboundedPriorityMessageQueue(blocking, comparator) with ExecutableMailbox {
|
2011-03-10 12:03:57 +01:00
|
|
|
def dispatcher = self
|
2011-03-09 18:20:48 +01:00
|
|
|
}
|
|
|
|
|
|
2011-03-11 14:51:24 +01:00
|
|
|
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
|
|
|
|
|
new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) with ExecutableMailbox {
|
2011-03-10 12:03:57 +01:00
|
|
|
def dispatcher = self
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
2011-03-09 18:20:48 +01:00
|
|
|
}
|
2011-03-09 18:11:45 +01:00
|
|
|
}
|