2009-12-11 16:37:44 +01:00
|
|
|
/**
|
2012-01-19 18:21:06 +01:00
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
2009-12-11 16:37:44 +01:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.dispatch
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2012-04-25 00:51:23 +02:00
|
|
|
import akka.event.Logging.Error
|
2010-10-22 17:50:48 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicReference
|
2011-12-01 17:03:30 +01:00
|
|
|
import akka.actor.ActorCell
|
2011-11-21 10:48:21 +01:00
|
|
|
import akka.util.Duration
|
2011-12-01 17:03:30 +01:00
|
|
|
import java.util.concurrent._
|
2012-05-16 17:14:49 +02:00
|
|
|
import akka.event.Logging
|
2010-05-27 15:50:11 +12:00
|
|
|
|
2009-12-11 16:37:44 +01:00
|
|
|
/**
|
2011-12-21 21:24:57 +01:00
|
|
|
* The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a
|
|
|
|
|
* `BlockingQueue`.
|
2009-12-11 16:37:44 +01:00
|
|
|
*
|
2011-12-21 21:24:57 +01:00
|
|
|
* The preferred way of creating dispatchers is to define configuration of it and use the
|
|
|
|
|
* the `lookup` method in [[akka.dispatch.Dispatchers]].
|
2009-12-11 16:37:44 +01:00
|
|
|
*
|
2010-06-02 16:56:36 +02:00
|
|
|
* @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
|
|
|
|
|
* mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
|
|
|
|
|
* always continues until the mailbox is empty.
|
2011-04-23 08:11:31 +02:00
|
|
|
* Larger values (or zero or negative) increase throughput, smaller values increase fairness
|
2009-12-11 16:37:44 +01:00
|
|
|
*/
|
2011-05-20 22:41:41 +02:00
|
|
|
class Dispatcher(
|
2011-11-17 16:09:18 +01:00
|
|
|
_prerequisites: DispatcherPrerequisites,
|
2011-12-21 19:02:06 +01:00
|
|
|
val id: String,
|
2011-10-06 21:19:46 +02:00
|
|
|
val throughput: Int,
|
2011-11-21 10:48:21 +01:00
|
|
|
val throughputDeadlineTime: Duration,
|
2011-10-06 21:19:46 +02:00
|
|
|
val mailboxType: MailboxType,
|
|
|
|
|
executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
|
2011-11-21 10:48:21 +01:00
|
|
|
val shutdownTimeout: Duration)
|
2011-11-17 16:09:18 +01:00
|
|
|
extends MessageDispatcher(_prerequisites) {
|
2010-08-21 15:36:50 +02:00
|
|
|
|
2012-05-20 19:03:20 +02:00
|
|
|
private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate {
|
|
|
|
|
lazy val executor: ExecutorService = factory.createExecutorService
|
|
|
|
|
def copy(): LazyExecutorServiceDelegate = new LazyExecutorServiceDelegate(factory)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@volatile private var executorServiceDelegate: LazyExecutorServiceDelegate =
|
|
|
|
|
new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory))
|
2012-01-20 12:30:19 +01:00
|
|
|
|
2012-05-21 13:39:39 +02:00
|
|
|
protected final def executorService: ExecutorServiceDelegate = executorServiceDelegate
|
2010-03-04 21:22:16 +01:00
|
|
|
|
2012-05-16 17:14:49 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL USE ONLY
|
|
|
|
|
*/
|
2012-05-20 19:06:31 +02:00
|
|
|
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
|
2011-10-19 13:19:44 +02:00
|
|
|
val mbox = receiver.mailbox
|
2012-02-21 13:22:25 +01:00
|
|
|
mbox.enqueue(receiver.self, invocation)
|
2011-09-26 19:52:49 +02:00
|
|
|
registerForExecution(mbox, true, false)
|
2011-09-20 18:34:21 +02:00
|
|
|
}
|
|
|
|
|
|
2012-05-16 17:14:49 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL USE ONLY
|
|
|
|
|
*/
|
2012-05-20 19:06:31 +02:00
|
|
|
protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = {
|
2011-10-18 16:44:35 +02:00
|
|
|
val mbox = receiver.mailbox
|
2011-11-12 10:57:28 +01:00
|
|
|
mbox.systemEnqueue(receiver.self, invocation)
|
2011-09-26 19:52:49 +02:00
|
|
|
registerForExecution(mbox, false, true)
|
2010-09-11 15:24:09 +02:00
|
|
|
}
|
|
|
|
|
|
2012-05-16 17:14:49 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL USE ONLY
|
|
|
|
|
*/
|
2011-09-27 17:41:02 +02:00
|
|
|
protected[akka] def executeTask(invocation: TaskInvocation) {
|
2011-09-26 17:52:52 +02:00
|
|
|
try {
|
2012-05-20 19:03:20 +02:00
|
|
|
executorService execute invocation
|
2011-09-26 17:52:52 +02:00
|
|
|
} catch {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: RejectedExecutionException ⇒
|
2011-11-16 15:54:14 +01:00
|
|
|
try {
|
2012-05-20 19:03:20 +02:00
|
|
|
executorService execute invocation
|
2011-11-16 15:54:14 +01:00
|
|
|
} catch {
|
|
|
|
|
case e2: RejectedExecutionException ⇒
|
2012-04-25 00:51:23 +02:00
|
|
|
prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!"))
|
2011-11-16 15:54:14 +01:00
|
|
|
throw e2
|
|
|
|
|
}
|
2011-03-02 16:44:44 -07:00
|
|
|
}
|
|
|
|
|
}
|
2011-02-25 15:20:58 -07:00
|
|
|
|
2012-05-16 17:14:49 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL USE ONLY
|
|
|
|
|
*/
|
2012-06-13 17:57:56 +02:00
|
|
|
protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox =
|
|
|
|
|
new Mailbox(mailboxType.create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue
|
2010-09-11 15:24:09 +02:00
|
|
|
|
2012-05-16 17:14:49 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL USE ONLY
|
|
|
|
|
*/
|
2012-05-20 19:03:20 +02:00
|
|
|
protected[akka] def shutdown: Unit = {
|
|
|
|
|
val newDelegate = executorServiceDelegate.copy() // Doesn't matter which one we copy
|
|
|
|
|
val es = synchronized { // FIXME getAndSet using ARFU or Unsafe
|
|
|
|
|
val service = executorServiceDelegate
|
|
|
|
|
executorServiceDelegate = newDelegate // just a quick getAndSet
|
|
|
|
|
service
|
|
|
|
|
}
|
|
|
|
|
es.shutdown()
|
|
|
|
|
}
|
2010-07-29 17:29:51 +02:00
|
|
|
|
2011-09-23 09:33:53 +02:00
|
|
|
/**
|
|
|
|
|
* Returns if it was registered
|
2012-05-16 17:14:49 +02:00
|
|
|
*
|
|
|
|
|
* INTERNAL USE ONLY
|
2011-09-23 09:33:53 +02:00
|
|
|
*/
|
2011-09-23 13:14:17 +02:00
|
|
|
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
|
2011-11-18 17:03:35 +01:00
|
|
|
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
|
2011-09-28 19:55:42 +02:00
|
|
|
if (mbox.setAsScheduled()) {
|
2011-02-27 23:44:04 +01:00
|
|
|
try {
|
2012-05-20 19:03:20 +02:00
|
|
|
executorService execute mbox
|
2011-09-23 09:33:53 +02:00
|
|
|
true
|
2011-02-27 23:44:04 +01:00
|
|
|
} catch {
|
2011-11-14 19:48:06 +01:00
|
|
|
case e: RejectedExecutionException ⇒
|
|
|
|
|
try {
|
2012-05-20 19:03:20 +02:00
|
|
|
executorService execute mbox
|
2011-11-14 19:48:06 +01:00
|
|
|
true
|
|
|
|
|
} catch { //Retry once
|
2012-04-25 00:51:23 +02:00
|
|
|
case e: RejectedExecutionException ⇒
|
|
|
|
|
mbox.setAsIdle()
|
|
|
|
|
prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
|
|
|
|
|
throw e
|
2011-11-14 19:48:06 +01:00
|
|
|
}
|
2011-02-27 23:44:04 +01:00
|
|
|
}
|
2011-09-26 19:52:49 +02:00
|
|
|
} else false
|
2011-09-23 09:33:53 +02:00
|
|
|
} else false
|
2011-02-28 22:54:32 +01:00
|
|
|
}
|
2010-10-11 12:42:02 +02:00
|
|
|
|
2012-05-20 19:06:31 +02:00
|
|
|
override val toString: String = Logging.simpleName(this) + "[" + id + "]"
|
2010-09-29 10:30:05 +02:00
|
|
|
}
|
|
|
|
|
|
2011-04-11 17:08:10 +02:00
|
|
|
object PriorityGenerator {
|
|
|
|
|
/**
|
|
|
|
|
* Creates a PriorityGenerator that uses the supplied function as priority generator
|
|
|
|
|
*/
|
2011-05-18 17:25:30 +02:00
|
|
|
def apply(priorityFunction: Any ⇒ Int): PriorityGenerator = new PriorityGenerator {
|
2011-04-11 17:08:10 +02:00
|
|
|
def gen(message: Any): Int = priorityFunction(message)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
|
2011-05-20 22:41:41 +02:00
|
|
|
* PriorityDispatcher
|
2011-04-11 17:08:10 +02:00
|
|
|
*/
|
2011-09-21 15:01:47 +02:00
|
|
|
abstract class PriorityGenerator extends java.util.Comparator[Envelope] {
|
2011-04-11 17:08:10 +02:00
|
|
|
def gen(message: Any): Int
|
|
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
final def compare(thisMessage: Envelope, thatMessage: Envelope): Int =
|
2011-04-11 17:08:10 +02:00
|
|
|
gen(thisMessage.message) - gen(thatMessage.message)
|
2011-12-09 18:44:59 +01:00
|
|
|
}
|