2009-12-11 16:37:44 +01:00
|
|
|
/**
|
2011-07-14 16:03:08 +02:00
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
2009-12-11 16:37:44 +01:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.dispatch
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2011-03-23 15:12:09 +01:00
|
|
|
import akka.event.EventHandler
|
2010-10-22 17:50:48 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicReference
|
2011-05-18 17:25:30 +02:00
|
|
|
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue }
|
2011-09-20 15:43:57 +02:00
|
|
|
import akka.actor.{ ActorCell, ActorKilledException }
|
2010-05-27 15:50:11 +12:00
|
|
|
|
2009-12-11 16:37:44 +01:00
|
|
|
/**
|
|
|
|
|
* Default settings are:
|
|
|
|
|
* <pre/>
|
|
|
|
|
* - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
|
|
|
|
* - NR_START_THREADS = 16
|
|
|
|
|
* - NR_MAX_THREADS = 128
|
|
|
|
|
* - KEEP_ALIVE_TIME = 60000L // one minute
|
|
|
|
|
* </pre>
|
|
|
|
|
* <p/>
|
|
|
|
|
*
|
|
|
|
|
* The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
|
|
|
|
|
* There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
|
|
|
|
|
* <p/>
|
|
|
|
|
*
|
|
|
|
|
* Scala API.
|
|
|
|
|
* <p/>
|
|
|
|
|
* Example usage:
|
|
|
|
|
* <pre/>
|
2011-05-20 22:41:41 +02:00
|
|
|
* val dispatcher = new Dispatcher("name")
|
2009-12-11 16:37:44 +01:00
|
|
|
* dispatcher
|
|
|
|
|
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
|
|
|
|
* .setCorePoolSize(16)
|
|
|
|
|
* .setMaxPoolSize(128)
|
|
|
|
|
* .setKeepAliveTimeInMillis(60000)
|
|
|
|
|
* .setRejectionPolicy(new CallerRunsPolicy)
|
|
|
|
|
* .buildThreadPool
|
|
|
|
|
* </pre>
|
|
|
|
|
* <p/>
|
|
|
|
|
*
|
|
|
|
|
* Java API.
|
|
|
|
|
* <p/>
|
|
|
|
|
* Example usage:
|
|
|
|
|
* <pre/>
|
2011-05-20 22:41:41 +02:00
|
|
|
* Dispatcher dispatcher = new Dispatcher("name");
|
2009-12-11 16:37:44 +01:00
|
|
|
* dispatcher
|
|
|
|
|
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
|
|
|
|
* .setCorePoolSize(16)
|
|
|
|
|
* .setMaxPoolSize(128)
|
|
|
|
|
* .setKeepAliveTimeInMillis(60000)
|
|
|
|
|
* .setRejectionPolicy(new CallerRunsPolicy())
|
|
|
|
|
* .buildThreadPool();
|
|
|
|
|
* </pre>
|
|
|
|
|
* <p/>
|
|
|
|
|
*
|
|
|
|
|
* But the preferred way of creating dispatchers is to use
|
2010-10-26 12:49:25 +02:00
|
|
|
* the {@link akka.dispatch.Dispatchers} factory object.
|
2009-12-11 16:37:44 +01:00
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2010-06-02 16:56:36 +02:00
|
|
|
* @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
|
|
|
|
|
* mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
|
|
|
|
|
* always continues until the mailbox is empty.
|
2011-04-23 08:11:31 +02:00
|
|
|
* Larger values (or zero or negative) increase throughput, smaller values increase fairness
|
2009-12-11 16:37:44 +01:00
|
|
|
*/
|
2011-05-20 22:41:41 +02:00
|
|
|
class Dispatcher(
|
2010-08-21 16:13:16 +02:00
|
|
|
_name: String,
|
2010-09-09 17:34:05 +02:00
|
|
|
val throughput: Int = Dispatchers.THROUGHPUT,
|
2010-09-21 18:52:41 +02:00
|
|
|
val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
2011-01-20 17:16:44 +01:00
|
|
|
val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
2011-07-01 22:13:56 +02:00
|
|
|
executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig())
|
2010-10-22 17:50:48 +02:00
|
|
|
extends MessageDispatcher {
|
2010-08-21 15:36:50 +02:00
|
|
|
|
2010-10-29 16:33:31 +02:00
|
|
|
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
|
2011-05-18 17:25:30 +02:00
|
|
|
this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
|
2010-09-04 12:00:12 +02:00
|
|
|
|
2010-10-29 16:33:31 +02:00
|
|
|
def this(_name: String, throughput: Int, mailboxType: MailboxType) =
|
2010-09-21 18:52:41 +02:00
|
|
|
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
|
2010-08-21 16:13:16 +02:00
|
|
|
|
2010-10-29 16:33:31 +02:00
|
|
|
def this(_name: String, throughput: Int) =
|
2010-09-21 18:52:41 +02:00
|
|
|
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
|
|
|
|
|
2011-07-01 22:13:56 +02:00
|
|
|
def this(_name: String, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider) =
|
|
|
|
|
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _executorServiceFactoryProvider)
|
2010-10-24 00:36:56 +02:00
|
|
|
|
2010-10-29 16:33:31 +02:00
|
|
|
def this(_name: String) =
|
2010-09-21 18:52:41 +02:00
|
|
|
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
|
|
|
|
|
2011-03-22 16:30:49 +01:00
|
|
|
val name = "akka:event-driven:dispatcher:" + _name
|
2010-09-21 18:52:41 +02:00
|
|
|
|
2011-08-30 15:50:52 +02:00
|
|
|
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
|
|
|
|
|
protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
|
2010-03-04 21:22:16 +01:00
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
protected[akka] def dispatch(invocation: Envelope) = {
|
|
|
|
|
val mbox = invocation.receiver.mailbox
|
2011-09-26 19:52:49 +02:00
|
|
|
mbox enqueue invocation
|
|
|
|
|
registerForExecution(mbox, true, false)
|
2011-09-20 18:34:21 +02:00
|
|
|
}
|
|
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
protected[akka] def systemDispatch(invocation: SystemEnvelope) = {
|
|
|
|
|
val mbox = invocation.receiver.mailbox
|
2011-09-26 19:52:49 +02:00
|
|
|
mbox systemEnqueue invocation
|
|
|
|
|
registerForExecution(mbox, false, true)
|
2010-09-11 15:24:09 +02:00
|
|
|
}
|
|
|
|
|
|
2011-09-27 17:41:02 +02:00
|
|
|
protected[akka] def executeTask(invocation: TaskInvocation) {
|
2011-09-26 17:52:52 +02:00
|
|
|
try {
|
|
|
|
|
executorService.get() execute invocation
|
|
|
|
|
} catch {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: RejectedExecutionException ⇒
|
2011-03-18 23:04:48 +01:00
|
|
|
EventHandler.warning(this, e.toString)
|
2011-03-02 16:44:44 -07:00
|
|
|
throw e
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-02-25 15:20:58 -07:00
|
|
|
|
2011-09-23 09:33:53 +02:00
|
|
|
protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(this)
|
2010-09-11 15:24:09 +02:00
|
|
|
|
2011-08-30 15:50:52 +02:00
|
|
|
protected[akka] def start {}
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2011-08-30 15:50:52 +02:00
|
|
|
protected[akka] def shutdown {
|
2011-07-01 22:13:56 +02:00
|
|
|
val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
|
2011-09-23 13:14:17 +02:00
|
|
|
if (old ne null)
|
2011-09-26 11:39:07 +02:00
|
|
|
old.shutdown()
|
2010-09-12 11:24:53 +02:00
|
|
|
}
|
2010-07-29 17:29:51 +02:00
|
|
|
|
2011-09-23 09:33:53 +02:00
|
|
|
/**
|
|
|
|
|
* Returns if it was registered
|
|
|
|
|
*/
|
2011-09-23 13:14:17 +02:00
|
|
|
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
|
2011-09-27 09:32:21 +02:00
|
|
|
if (mbox.shouldBeRegisteredForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
|
|
|
|
|
if (mbox.dispatcherLock.tryLock()) {
|
2011-02-27 23:44:04 +01:00
|
|
|
try {
|
|
|
|
|
executorService.get() execute mbox
|
2011-09-23 09:33:53 +02:00
|
|
|
true
|
2011-02-27 23:44:04 +01:00
|
|
|
} catch {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: RejectedExecutionException ⇒
|
2011-03-18 23:04:48 +01:00
|
|
|
EventHandler.warning(this, e.toString)
|
2011-02-27 23:44:04 +01:00
|
|
|
mbox.dispatcherLock.unlock()
|
|
|
|
|
throw e
|
|
|
|
|
}
|
2011-09-26 19:52:49 +02:00
|
|
|
} else false
|
2011-09-23 09:33:53 +02:00
|
|
|
} else false
|
2011-02-28 22:54:32 +01:00
|
|
|
}
|
2010-10-11 12:42:02 +02:00
|
|
|
|
|
|
|
|
override val toString = getClass.getSimpleName + "[" + name + "]"
|
2010-09-29 10:30:05 +02:00
|
|
|
}
|
|
|
|
|
|
2011-04-11 17:08:10 +02:00
|
|
|
object PriorityGenerator {
|
|
|
|
|
/**
|
|
|
|
|
* Creates a PriorityGenerator that uses the supplied function as priority generator
|
|
|
|
|
*/
|
2011-05-18 17:25:30 +02:00
|
|
|
def apply(priorityFunction: Any ⇒ Int): PriorityGenerator = new PriorityGenerator {
|
2011-04-11 17:08:10 +02:00
|
|
|
def gen(message: Any): Int = priorityFunction(message)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
|
2011-05-20 22:41:41 +02:00
|
|
|
* PriorityDispatcher
|
2011-04-11 17:08:10 +02:00
|
|
|
*/
|
2011-09-21 15:01:47 +02:00
|
|
|
abstract class PriorityGenerator extends java.util.Comparator[Envelope] {
|
2011-04-11 17:08:10 +02:00
|
|
|
def gen(message: Any): Int
|
|
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
final def compare(thisMessage: Envelope, thatMessage: Envelope): Int =
|
2011-04-11 17:08:10 +02:00
|
|
|
gen(thisMessage.message) - gen(thatMessage.message)
|
|
|
|
|
}
|
|
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
// TODO: should this be deleted, given that any dispatcher can now use UnboundedPriorityMailbox?
|
|
|
|
|
|
2011-03-09 18:20:48 +01:00
|
|
|
/**
|
2011-05-20 22:41:41 +02:00
|
|
|
* A version of Dispatcher that gives all actors registered to it a priority mailbox,
|
2011-03-09 18:20:48 +01:00
|
|
|
* prioritized according to the supplied comparator.
|
2011-04-11 17:08:10 +02:00
|
|
|
*
|
|
|
|
|
* The dispatcher will process the messages with the _lowest_ priority first.
|
2011-03-09 18:20:48 +01:00
|
|
|
*/
|
2011-05-20 22:41:41 +02:00
|
|
|
class PriorityDispatcher(
|
2011-05-18 17:25:30 +02:00
|
|
|
name: String,
|
2011-09-21 15:01:47 +02:00
|
|
|
val comparator: java.util.Comparator[Envelope],
|
2011-05-18 17:25:30 +02:00
|
|
|
throughput: Int = Dispatchers.THROUGHPUT,
|
|
|
|
|
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
|
|
|
|
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
2011-09-21 15:01:47 +02:00
|
|
|
executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, executorServiceFactoryProvider) {
|
2011-03-09 18:20:48 +01:00
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
def this(name: String, comparator: java.util.Comparator[Envelope], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
|
2011-05-18 17:25:30 +02:00
|
|
|
this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
|
2011-03-09 18:20:48 +01:00
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
def this(name: String, comparator: java.util.Comparator[Envelope], throughput: Int, mailboxType: MailboxType) =
|
2011-03-09 18:20:48 +01:00
|
|
|
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
|
|
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
def this(name: String, comparator: java.util.Comparator[Envelope], throughput: Int) =
|
2011-03-11 14:51:24 +01:00
|
|
|
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
2011-03-09 18:20:48 +01:00
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
def this(name: String, comparator: java.util.Comparator[Envelope], executorServiceFactoryProvider: ExecutorServiceFactoryProvider) =
|
2011-07-01 22:13:56 +02:00
|
|
|
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, executorServiceFactoryProvider)
|
2011-03-09 18:20:48 +01:00
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
def this(name: String, comparator: java.util.Comparator[Envelope]) =
|
2011-03-11 14:51:24 +01:00
|
|
|
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
2011-03-10 12:03:57 +01:00
|
|
|
|
2011-09-23 09:33:53 +02:00
|
|
|
protected val mailbox = mailboxType match {
|
|
|
|
|
case _: UnboundedMailbox ⇒ UnboundedPriorityMailbox(comparator)
|
|
|
|
|
case BoundedMailbox(cap, timeout) ⇒ BoundedPriorityMailbox(comparator, cap, timeout)
|
|
|
|
|
case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]")
|
2011-03-09 18:20:48 +01:00
|
|
|
}
|
2011-09-23 09:33:53 +02:00
|
|
|
|
|
|
|
|
override def createMailbox(actor: ActorCell): Mailbox = mailbox.create(this)
|
2011-03-14 10:45:55 +01:00
|
|
|
}
|