rewrite mailbox selection logic, see #3342

- add “mailbox-requirement” key to dispatcher section
- split out mailbox section, add akka.actor.default-mailbox
- rewrite findMarker method and use it for Props.create() and getting
  the required mailbox of an actor
- add ProducesMessageQueue trait for MailboxType so that requirements
  can be checked before trying to create the actor for real
- verify actor as well as dispatcher requirements for message queue
  before creation, even in remote-deployed case
- change MessageDispatcher constructor to take a Configurator, add that
  to migration guide
This commit is contained in:
Roland 2013-06-01 21:58:34 +02:00
parent 51ed174432
commit f317aaf711
37 changed files with 843 additions and 419 deletions

View file

@ -47,6 +47,14 @@ public class ActorCreationTest {
}
}
static interface I<T> extends Creator<UntypedActor> {}
static class F implements I<Object> {
@Override
public UntypedActor create() {
return null;
}
}
@Test
public void testRightCreator() {
final Props p = Props.create(new C());
@ -65,4 +73,10 @@ public class ActorCreationTest {
assertEquals(UntypedActor.class, p.actorClass());
}
@Test
public void testSuperinterface() {
final Props p = Props.create(new F());
assertEquals(UntypedActor.class, p.actorClass());
}
}

View file

@ -22,6 +22,13 @@ object ActorMailboxSpec {
mailbox-type = "akka.dispatch.BoundedMailbox"
}
requiring-bounded-dispatcher {
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-requirement = "akka.dispatch.BoundedMessageQueueSemantics"
}
unbounded-mailbox {
mailbox-type = "akka.dispatch.UnboundedMailbox"
}
@ -80,10 +87,32 @@ object ActorMailboxSpec {
dispatcher = bounded-dispatcher
mailbox = unbounded-mailbox
}
/bounded-deque-requirements-configured {
dispatcher = requiring-bounded-dispatcher
mailbox = akka.actor.mailbox.bounded-deque-based
}
/bounded-deque-require-unbounded-configured {
dispatcher = requiring-bounded-dispatcher
mailbox = akka.actor.mailbox.unbounded-deque-based
}
/bounded-deque-require-unbounded-unconfigured {
dispatcher = requiring-bounded-dispatcher
}
/bounded-deque-requirements-configured-props-disp {
mailbox = akka.actor.mailbox.bounded-deque-based
}
/bounded-deque-require-unbounded-configured-props-disp {
mailbox = akka.actor.mailbox.unbounded-deque-based
}
/bounded-deque-requirements-configured-props-mail {
dispatcher = requiring-bounded-dispatcher
}
/bounded-deque-require-unbounded-configured-props-mail {
dispatcher = requiring-bounded-dispatcher
}
/bounded-deque-require-unbounded-unconfigured-props-mail {
dispatcher = requiring-bounded-dispatcher
}
akka.actor.mailbox.requirements {
"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}
""")
@ -97,10 +126,16 @@ object ActorMailboxSpec {
class StashQueueReportingActor extends QueueReportingActor with Stash
val UnboundedMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[UnboundedMessageQueueSemantics])
val BoundedMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[BoundedMessageQueueSemantics])
val UnboundedDeqMailboxTypes = Seq(classOf[QueueBasedMessageQueue], classOf[DequeBasedMessageQueue],
val UnboundedMailboxTypes = Seq(classOf[UnboundedMessageQueueSemantics])
val BoundedMailboxTypes = Seq(classOf[BoundedMessageQueueSemantics])
val UnboundedDeqMailboxTypes = Seq(
classOf[DequeBasedMessageQueueSemantics],
classOf[UnboundedMessageQueueSemantics],
classOf[UnboundedDequeBasedMessageQueueSemantics])
val BoundedDeqMailboxTypes = Seq(
classOf[DequeBasedMessageQueueSemantics],
classOf[BoundedMessageQueueSemantics],
classOf[BoundedDequeBasedMessageQueueSemantics])
}
class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with DefaultTimeout with ImplicitSender {
@ -122,7 +157,7 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau
checkMailboxQueue(Props[QueueReportingActor], "default-default", UnboundedMailboxTypes)
}
"get an unbounded dequeu message queue when it is only configured on the props" in {
"get an unbounded deque message queue when it is only configured on the props" in {
checkMailboxQueue(Props[QueueReportingActor].withMailbox("akka.actor.mailbox.unbounded-deque-based"),
"default-override-from-props", UnboundedDeqMailboxTypes)
}
@ -132,7 +167,7 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau
"default-override-from-trait", BoundedMailboxTypes)
}
"get an unbounded dequeu message queue when it's only mixed with Stash" in {
"get an unbounded deque message queue when it's only mixed with Stash" in {
checkMailboxQueue(Props[StashQueueReportingActor],
"default-override-from-stash", UnboundedDeqMailboxTypes)
}
@ -141,14 +176,12 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau
checkMailboxQueue(Props[QueueReportingActor], "default-bounded", BoundedMailboxTypes)
}
"get an unbounded dequeu message queue when it's configured as mailbox" in {
"get an unbounded deque message queue when it's configured as mailbox" in {
checkMailboxQueue(Props[QueueReportingActor], "default-unbounded-deque", UnboundedDeqMailboxTypes)
}
"fail to create actor when an unbounded dequeu message queue is configured as mailbox overriding RequestMailbox" in {
filterEvents(EventFilter[ActorInitializationException]()) {
verifyActorTermination(system.actorOf(Props[BoundedQueueReportingActor], "default-unbounded-deque-override-trait"))
}
intercept[IllegalArgumentException](system.actorOf(Props[BoundedQueueReportingActor], "default-unbounded-deque-override-trait"))
}
"get an unbounded message queue when defined in dispatcher" in {
@ -156,9 +189,7 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau
}
"fail to create actor when an unbounded message queue is defined in dispatcher overriding RequestMailbox" in {
filterEvents(EventFilter[ActorInitializationException]()) {
verifyActorTermination(system.actorOf(Props[BoundedQueueReportingActor], "unbounded-default-override-trait"))
}
intercept[IllegalArgumentException](system.actorOf(Props[BoundedQueueReportingActor], "unbounded-default-override-trait"))
}
"get a bounded message queue when it's configured as mailbox overriding unbounded in dispatcher" in {
@ -183,5 +214,84 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau
"bounded-unbounded-override-props", UnboundedMailboxTypes)
}
"get a bounded deque-based message queue if configured and required" in {
checkMailboxQueue(Props[StashQueueReportingActor], "bounded-deque-requirements-configured", BoundedDeqMailboxTypes)
}
"fail with a unbounded deque-based message queue if configured and required" in {
intercept[IllegalArgumentException](system.actorOf(Props[StashQueueReportingActor], "bounded-deque-require-unbounded-configured"))
}
"fail with a bounded deque-based message queue if not configured" in {
intercept[IllegalArgumentException](system.actorOf(Props[StashQueueReportingActor], "bounded-deque-require-unbounded-unconfigured"))
}
"get a bounded deque-based message queue if configured and required with Props" in {
checkMailboxQueue(
Props[StashQueueReportingActor]
.withDispatcher("requiring-bounded-dispatcher")
.withMailbox("akka.actor.mailbox.bounded-deque-based"),
"bounded-deque-requirements-configured-props",
BoundedDeqMailboxTypes)
}
"fail with a unbounded deque-based message queue if configured and required with Props" in {
intercept[IllegalArgumentException](system.actorOf(
Props[StashQueueReportingActor]
.withDispatcher("requiring-bounded-dispatcher")
.withMailbox("akka.actor.mailbox.unbounded-deque-based"),
"bounded-deque-require-unbounded-configured-props"))
}
"fail with a bounded deque-based message queue if not configured with Props" in {
intercept[IllegalArgumentException](system.actorOf(
Props[StashQueueReportingActor]
.withDispatcher("requiring-bounded-dispatcher"),
"bounded-deque-require-unbounded-unconfigured-props"))
}
"get a bounded deque-based message queue if configured and required with Props (dispatcher)" in {
checkMailboxQueue(
Props[StashQueueReportingActor]
.withDispatcher("requiring-bounded-dispatcher"),
"bounded-deque-requirements-configured-props-disp",
BoundedDeqMailboxTypes)
}
"fail with a unbounded deque-based message queue if configured and required with Props (dispatcher)" in {
intercept[IllegalArgumentException](system.actorOf(
Props[StashQueueReportingActor]
.withDispatcher("requiring-bounded-dispatcher"),
"bounded-deque-require-unbounded-configured-props-disp"))
}
"fail with a bounded deque-based message queue if not configured with Props (dispatcher)" in {
intercept[IllegalArgumentException](system.actorOf(
Props[StashQueueReportingActor]
.withDispatcher("requiring-bounded-dispatcher"),
"bounded-deque-require-unbounded-unconfigured-props-disp"))
}
"get a bounded deque-based message queue if configured and required with Props (mailbox)" in {
checkMailboxQueue(
Props[StashQueueReportingActor]
.withMailbox("akka.actor.mailbox.bounded-deque-based"),
"bounded-deque-requirements-configured-props-mail",
BoundedDeqMailboxTypes)
}
"fail with a unbounded deque-based message queue if configured and required with Props (mailbox)" in {
intercept[IllegalArgumentException](system.actorOf(
Props[StashQueueReportingActor]
.withMailbox("akka.actor.mailbox.unbounded-deque-based"),
"bounded-deque-require-unbounded-configured-props-mail"))
}
"fail with a bounded deque-based message queue if not configured with Props (mailbox)" in {
intercept[IllegalArgumentException](system.actorOf(
Props[StashQueueReportingActor],
"bounded-deque-require-unbounded-unconfigured-props-mail"))
}
}
}

View file

@ -84,12 +84,10 @@ object ActorSystemSpec {
class SlowDispatcher(_config: Config, _prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(_config, _prerequisites) {
private val instance = new Dispatcher(
prerequisites,
this,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
mailBoxTypeConfigured,
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) {
val doneIt = new Switch

View file

@ -269,7 +269,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
case Failure(e) if check(n, e)
if (delay.isDefined) {
executor match {
case m: MessageDispatcher m.prerequisites.scheduler.scheduleOnce(delay.get)(run(n + 1))
case m: MessageDispatcher m.configurator.prerequisites.scheduler.scheduleOnce(delay.get)(run(n + 1))
case _ // Thread.sleep, ignore, or other?
}
} else run(n + 1)

View file

@ -78,12 +78,10 @@ object SupervisorHierarchySpec {
extends DispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher =
new Dispatcher(prerequisites,
new Dispatcher(this,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
mailBoxTypeConfigured,
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) {

View file

@ -529,12 +529,10 @@ object DispatcherModelSpec {
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher =
new Dispatcher(prerequisites,
new Dispatcher(this,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
mailBoxTypeConfigured,
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor
@ -600,20 +598,17 @@ object BalancingDispatcherModelSpec {
}
class BalancingMessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
extends BalancingDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher =
new BalancingDispatcher(prerequisites,
override protected def create(mailboxType: MailboxType): BalancingDispatcher =
new BalancingDispatcher(this,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
mailBoxTypeConfigured,
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS),
config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor
override def dispatcher(): MessageDispatcher = instance
}
}

View file

@ -63,9 +63,6 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
{
c.getString("type") must equal("Dispatcher")
c.getString("executor") must equal("fork-join-executor")
c.getInt("mailbox-capacity") must equal(-1)
c.getMilliseconds("mailbox-push-timeout-time") must equal(10 * 1000)
c.getString("mailbox-type") must be("")
c.getMilliseconds("shutdown-timeout") must equal(1 * 1000)
c.getInt("throughput") must equal(5)
c.getMilliseconds("throughput-deadline-time") must equal(0)
@ -134,6 +131,18 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
io.getInt("default-backlog") must be(ioExtSettings.defaultBacklog)
}
}
{
val c = config.getConfig("akka.actor.default-mailbox")
// general mailbox config
{
c.getInt("mailbox-capacity") must equal(1000)
c.getMilliseconds("mailbox-push-timeout-time") must equal(10 * 1000)
c.getString("mailbox-type") must be("akka.dispatch.UnboundedMailbox")
}
}
}
}
}

View file

@ -209,7 +209,7 @@ object CustomMailboxSpec {
}
}
class MyMailbox(owner: ActorRef) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
class MyMailbox(owner: ActorRef) extends UnboundedQueueBasedMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
}

View file

@ -32,7 +32,7 @@ object CallingThreadDispatcherModelSpec {
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher =
new CallingThreadDispatcher(prerequisites, UnboundedMailbox(), false) with MessageDispatcherInterceptor {
new CallingThreadDispatcher(this) with MessageDispatcherInterceptor {
override def id: String = config.getString("id")
}

View file

@ -281,30 +281,34 @@ akka {
# Throughput deadline for Dispatcher, set to 0 or negative for no deadline
throughput-deadline-time = 0ms
# If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using
# the property
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead
# to deadlock, use with care
# The following mailbox-push-timeout-time is only used for type=Dispatcher
# and only if mailbox-capacity > 0
mailbox-capacity = -1
# Specifies the timeout to add a new message to a mailbox that is full -
# negative number means infinite timeout. It is only used for type=Dispatcher
# and only if mailbox-capacity > 0
mailbox-push-timeout-time = 10s
# FQCN of the MailboxType, if not specified the default bounded or unbounded
# mailbox is used. The Class of the FQCN must have a public constructor with
# (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
mailbox-type = ""
# For BalancingDispatcher: If the balancing dispatcher should attempt to
# schedule idle actors using the same dispatcher when a message comes in,
# and the dispatchers ExecutorService is not fully busy already.
attempt-teamwork = on
# If this dispatcher requires a specific type of mailbox, specify the
# fully-qualified class name here; the actually created mailbox will
# be a subtype of this type. The empty string signifies no requirement.
mailbox-requirement = ""
}
default-mailbox {
# FQCN of the MailboxType. The Class of the FQCN must have a public constructor with
# (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
mailbox-type = "akka.dispatch.UnboundedMailbox"
# If the mailbox is bounded then it uses this setting to determine its
# capacity. The provided value must be positive.
# NOTICE:
# Up to version 2.1 the mailbox type was determined based on this setting;
# this is no longer the case, the type must explicitly name a bounded mailbox.
mailbox-capacity = 1000
# If the mailbox is bounded then this is the timeout for enqueueing
# in case the mailbox is full. Negative values signify infinite
# timeout, which should be avoided as it bears the risk of dead-lock.
mailbox-push-timeout-time = 10s
# For Actor with Stash: The default capacity of the stash.
# If negative (or zero) then an unbounded stash is used (default)
# If positive then a bounded stash is used and the capacity is set using
@ -322,10 +326,16 @@ akka {
requirements {
"akka.dispatch.UnboundedMessageQueueSemantics" =
akka.actor.mailbox.unbounded-queue-based
"akka.dispatch.DequeBasedMessageQueue" =
"akka.dispatch.BoundedMessageQueueSemantics" =
akka.actor.mailbox.bounded-queue-based
"akka.dispatch.DequeBasedMessageQueueSemantics" =
akka.actor.mailbox.unbounded-deque-based
"akka.dispatch.UnboundedDequeBasedMessageQueueSemantics" =
akka.actor.mailbox.unbounded-deque-based
"akka.dispatch.BoundedDequeBasedMessageQueueSemantics" =
akka.actor.mailbox.bounded-deque-based
"akka.dispatch.MultipleConsumerSemantics" =
akka.actor.mailbox.unbounded-queue-based
}
unbounded-queue-based {
@ -335,12 +345,26 @@ akka {
mailbox-type = "akka.dispatch.UnboundedMailbox"
}
bounded-queue-based {
# FQCN of the MailboxType, The Class of the FQCN must have a public
# constructor with (akka.actor.ActorSystem.Settings,
# com.typesafe.config.Config) parameters.
mailbox-type = "akka.dispatch.BoundedMailbox"
}
unbounded-deque-based {
# FQCN of the MailboxType, The Class of the FQCN must have a public
# constructor with (akka.actor.ActorSystem.Settings,
# com.typesafe.config.Config) parameters.
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
}
bounded-deque-based {
# FQCN of the MailboxType, The Class of the FQCN must have a public
# constructor with (akka.actor.ActorSystem.Settings,
# com.typesafe.config.Config) parameters.
mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox"
}
}
debug {

View file

@ -16,6 +16,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal
import akka.dispatch.MessageDispatcher
/**
* The actor context - the view of the actor cell from the actor.
@ -338,6 +339,7 @@ private[akka] class ActorCell(
val system: ActorSystemImpl,
val self: InternalActorRef,
val props: Props,
val dispatcher: MessageDispatcher,
val parent: InternalActorRef)
extends UntypedActorContext with Cell
with dungeon.ReceiveTimeout

View file

@ -271,6 +271,8 @@ private[akka] case object Nobody extends MinimalActorRef {
private[akka] class LocalActorRef private[akka] (
_system: ActorSystemImpl,
_props: Props,
_dispatcher: MessageDispatcher,
_mailboxType: MailboxType,
_supervisor: InternalActorRef,
override val path: ActorPath)
extends ActorRefWithCell with LocalRef {
@ -285,11 +287,11 @@ private[akka] class LocalActorRef private[akka] (
* actorCell before we call init and start, since we can start using "this"
* object from another thread as soon as we run init.
*/
private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor)
actorCell.init(sendSupervise = true)
private val actorCell: ActorCell = newActorCell(_system, this, _props, _dispatcher, _supervisor)
actorCell.init(sendSupervise = true, _mailboxType)
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, supervisor)
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, dispatcher: MessageDispatcher, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, dispatcher, supervisor)
protected def actorContext: ActorContext = actorCell

View file

@ -13,10 +13,12 @@ import akka.util.{ Switch, Helpers }
import akka.japi.Util.immutableSeq
import akka.util.Collections.EmptyImmutableSeq
import scala.util.{ Success, Failure }
import scala.util.control.NonFatal
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.annotation.implicitNotFound
import akka.ConfigurationException
import akka.dispatch.Mailboxes
/**
* Interface for all ActorRef providers to implement.
@ -559,10 +561,16 @@ private[akka] class LocalActorRefProvider private[akka] (
*/
protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
private lazy val defaultDispatcher = system.dispatchers.defaultGlobalDispatcher
private lazy val defaultMailbox = system.mailboxes.lookup(Mailboxes.DefaultMailboxId)
override lazy val rootGuardian: LocalActorRef =
new LocalActorRef(
system,
Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
defaultDispatcher,
defaultMailbox,
theOneWhoWalksTheBubblesOfSpaceTime,
rootPath) {
override def getParent: InternalActorRef = this
@ -581,7 +589,7 @@ private[akka] class LocalActorRefProvider private[akka] (
val cell = rootGuardian.underlying
cell.reserveChild("user")
val ref = new LocalActorRef(system, Props(classOf[LocalActorRefProvider.Guardian], guardianStrategy),
rootGuardian, rootPath / "user")
defaultDispatcher, defaultMailbox, rootGuardian, rootPath / "user")
cell.initChild(ref)
ref.start()
ref
@ -592,7 +600,7 @@ private[akka] class LocalActorRefProvider private[akka] (
cell.reserveChild("system")
val ref = new LocalActorRef(
system, Props(classOf[LocalActorRefProvider.SystemGuardian], systemGuardianStrategy, guardian),
rootGuardian, rootPath / "system")
defaultDispatcher, defaultMailbox, rootGuardian, rootPath / "system")
cell.initChild(ref)
ref.start()
ref
@ -712,19 +720,41 @@ private[akka] class LocalActorRefProvider private[akka] (
if (!system.dispatchers.hasDispatcher(props2.dispatcher))
throw new ConfigurationException(s"Dispatcher [${props2.dispatcher}] not configured for path $path")
if (async) new RepointableActorRef(system, props2, supervisor, path).initialize(async)
else new LocalActorRef(system, props2, supervisor, path)
try {
val dispatcher = system.dispatchers.lookup(props2.dispatcher)
val mailboxType = system.mailboxes.getMailboxType(props2, dispatcher.configurator.config)
if (async) new RepointableActorRef(system, props2, dispatcher, mailboxType, supervisor, path).initialize(async)
else new LocalActorRef(system, props2, dispatcher, mailboxType, supervisor, path)
} catch {
case NonFatal(e) throw new IllegalArgumentException(
s"configuration problem while creating [$path] with dispatcher [${props2.dispatcher}] and mailbox [${props2.mailbox}]", e)
}
case router
val lookup = if (lookupDeploy) deployer.lookup(path) else None
val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router))
val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) b withFallback a)
val p = props.withRouter(d.routerConfig)
if (!system.dispatchers.hasDispatcher(p.dispatcher))
throw new ConfigurationException(s"Dispatcher [${p.dispatcher}] not configured for routees of $path")
if (!system.dispatchers.hasDispatcher(d.routerConfig.routerDispatcher))
throw new ConfigurationException(s"Dispatcher [${p.dispatcher}] not configured for router of $path")
new RoutedActorRef(system, p, supervisor, path).initialize(async)
val routerProps =
Props(p.deploy.copy(dispatcher = p.routerConfig.routerDispatcher),
classOf[RoutedActorCell.RouterCreator], Vector(p.routerConfig))
val routerDispatcher = system.dispatchers.lookup(p.routerConfig.routerDispatcher)
val routerMailbox = system.mailboxes.getMailboxType(routerProps, routerDispatcher.configurator.config)
val routeeProps = p.withRouter(NoRouter)
// the RouteeProvider uses context.actorOf() to create the routees, which does not allow us to pass
// these through, but obtain them here for early verification
val routeeDispatcher = system.dispatchers.lookup(p.dispatcher)
val routeeMailbox = system.mailboxes.getMailboxType(routeeProps, routeeDispatcher.configurator.config)
new RoutedActorRef(system, routerProps, routerDispatcher, routerMailbox, routeeProps, supervisor, path).initialize(async)
}
}

View file

@ -559,13 +559,13 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
def hasSystemMessages = false
}
val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess)
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings))
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings, mailboxes))
val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher
val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess)
val internalCallingThreadExecutionContext: ExecutionContext =
dynamicAccess.getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$").getOrElse(
new ExecutionContext with BatchingExecutor {

View file

@ -91,11 +91,11 @@ object Props {
@deprecated("use Props.withDispatcher and friends", "2.2")
def apply(
creator: () Actor = Props.defaultCreator,
dispatcher: String = Dispatchers.DefaultDispatcherId,
dispatcher: String = Deploy.NoDispatcherGiven,
routerConfig: RouterConfig = Props.defaultRoutedProps,
deploy: Deploy = Props.defaultDeploy): Props = {
val d1 = if (dispatcher != Dispatchers.DefaultDispatcherId) deploy.copy(dispatcher = dispatcher) else deploy
val d1 = if (dispatcher != Deploy.NoDispatcherGiven) deploy.copy(dispatcher = dispatcher) else deploy
val d2 = if (routerConfig != Props.defaultRoutedProps) d1.copy(routerConfig = routerConfig) else d1
val p = Props(classOf[CreatorFunctionConsumer], creator)
if (d2 != Props.defaultDeploy) p.withDeploy(d2) else p
@ -125,22 +125,17 @@ object Props {
def create[T <: Actor](creator: Creator[T]): Props = {
if ((creator.getClass.getModifiers & Modifier.STATIC) == 0)
throw new IllegalArgumentException("cannot use non-static local Creator to create actors; make it static or top-level")
val cc = classOf[Creator[_]]
val ac = classOf[Actor]
@tailrec def findType(c: Class[_]): Class[_] = {
c.getGenericInterfaces collectFirst {
case t: ParameterizedType if cc.isAssignableFrom(t.getRawType.asInstanceOf[Class[_]])
val actorClass = Reflect.findMarker(creator.getClass, classOf[Creator[_]]) match {
case t: ParameterizedType
t.getActualTypeArguments.head match {
case c: Class[_] c // since T <: Actor
case v: TypeVariable[_]
v.getBounds collectFirst { case c: Class[_] if ac.isAssignableFrom(c) && c != ac c } getOrElse ac
}
} match {
case Some(x) x
case None findType(c.getSuperclass)
case x throw new IllegalArgumentException(s"unsupported type found in Creator argument [$x]")
}
}
apply(defaultDeploy, classOf[CreatorConsumer], findType(creator.getClass) :: creator :: Nil)
apply(defaultDeploy, classOf[CreatorConsumer], actorClass :: creator :: Nil)
}
}
@ -245,9 +240,9 @@ final case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]
* Convenience method for extracting the mailbox information from the
* contained [[Deploy]] instance.
*/
def mailbox: Option[String] = deploy.mailbox match {
case NoMailboxGiven None
case x Some(x)
def mailbox: String = deploy.mailbox match {
case NoMailboxGiven Mailboxes.DefaultMailboxId
case x x
}
/**

View file

@ -29,6 +29,8 @@ import util.Try
private[akka] class RepointableActorRef(
val system: ActorSystemImpl,
val props: Props,
val dispatcher: MessageDispatcher,
val mailboxType: MailboxType,
val supervisor: InternalActorRef,
val path: ActorPath)
extends ActorRefWithCell with RepointableRef {
@ -111,7 +113,7 @@ private[akka] class RepointableActorRef(
* unstarted cell. The cell must be fully functional.
*/
def newCell(old: UnstartedCell): Cell =
new ActorCell(system, this, props, supervisor).init(sendSupervise = false)
new ActorCell(system, this, props, dispatcher, supervisor).init(sendSupervise = false, mailboxType)
def start(): Unit = ()

View file

@ -3,7 +3,7 @@
*/
package akka.actor
import akka.dispatch.{ UnboundedDequeBasedMessageQueueSemantics, RequiresMessageQueue, Envelope, DequeBasedMessageQueue }
import akka.dispatch.{ UnboundedDequeBasedMessageQueueSemantics, RequiresMessageQueue, Envelope, DequeBasedMessageQueueSemantics }
import akka.AkkaException
/**
@ -47,7 +47,7 @@ import akka.AkkaException
* any trait/class that overrides the `preRestart` callback. This means it's not possible to write
* `Actor with MyActor with Stash` if `MyActor` overrides `preRestart`.
*/
trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueue]
trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueueSemantics]
/**
* The `UnboundedStash` trait is a version of `Stash` that enforces an unbounded stash for you actor.
@ -64,16 +64,16 @@ trait UnrestrictedStash extends Actor {
*/
private val capacity: Int = {
val dispatcher = context.system.settings.config.getConfig(context.props.dispatcher)
val config = dispatcher.withFallback(context.system.settings.config.getConfig("akka.actor.default-dispatcher"))
val config = dispatcher.withFallback(context.system.settings.config.getConfig("akka.actor.default-mailbox"))
config.getInt("stash-capacity")
}
/* The actor's deque-based message queue.
* `mailbox.queue` is the underlying `Deque`.
*/
private val mailbox: DequeBasedMessageQueue = {
private val mailbox: DequeBasedMessageQueueSemantics = {
context.asInstanceOf[ActorCell].mailbox.messageQueue match {
case queue: DequeBasedMessageQueue queue
case queue: DequeBasedMessageQueueSemantics queue
case other throw ActorInitializationException(self, s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" +
"""An (unbounded) deque-based mailbox can be configured as follows:
| my-custom-mailbox {

View file

@ -13,6 +13,8 @@ import akka.actor._
import akka.serialization.SerializationExtension
import scala.util.control.NonFatal
import scala.util.control.Exception.Catcher
import akka.dispatch.MailboxType
import akka.dispatch.ProducesMessageQueue
private[akka] trait Dispatch { this: ActorCell
@ -30,8 +32,6 @@ private[akka] trait Dispatch { this: ActorCell ⇒
final def numberOfMessages: Int = mailbox.numberOfMessages
val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
final def isTerminated: Boolean = mailbox.isClosed
/**
@ -39,24 +39,29 @@ private[akka] trait Dispatch { this: ActorCell ⇒
* reasonably different from the previous UID of a possible actor with the same path,
* which can be achieved by using ThreadLocalRandom.current.nextInt().
*/
final def init(sendSupervise: Boolean): this.type = {
final def init(sendSupervise: Boolean, mailboxType: MailboxType): this.type = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
val mbox = dispatcher.createMailbox(this)
val mbox = dispatcher.createMailbox(this, mailboxType)
/*
* The mailboxType was calculated taking into account what the MailboxType
* has promised to produce. If that was more than the default, then we need
* to reverify here because the dispatcher may well have screwed it up.
*/
// we need to delay the failure to the point of actor creation so we can handle
// it properly in the normal way
val actorClass = props.actorClass
val createMessage = if (system.mailboxes.hasRequiredType(actorClass)) {
Create(system.mailboxes.getRequiredType(actorClass).flatMap {
case c if !c.isAssignableFrom(mbox.messageQueue.getClass)
Some(ActorInitializationException(self, s"Actor [${self}] requires mailbox type [${c}]" +
s" got [${mbox.messageQueue.getClass}]"))
case _ None
})
} else Create(None)
val createMessage = mailboxType match {
case _: ProducesMessageQueue[_] if system.mailboxes.hasRequiredType(actorClass)
val req = system.mailboxes.getRequiredType(actorClass)
if (req isInstance mbox.messageQueue) Create(None)
else Create(Some(ActorInitializationException(self,
s"Actor [$self] requires mailbox type [$req] got [${mbox.messageQueue.getClass}]")))
case _ Create(None)
}
swapMailbox(mbox)
mailbox.setActor(this)

View file

@ -17,6 +17,10 @@ import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import scala.util.Try
import scala.util.Failure
import akka.util.Reflect
import java.lang.reflect.ParameterizedType
final case class Envelope private (val message: Any, val sender: ActorRef)
@ -79,11 +83,11 @@ private[akka] object MessageDispatcher {
}
}
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext {
abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext {
import MessageDispatcher._
import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset }
import prerequisites._
import configurator.prerequisites._
@volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH!
@volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH!
@ -109,18 +113,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
/**
* Creates and returns a mailbox for the given actor.
*/
protected[akka] def createMailbox(actor: Cell): Mailbox
/**
* Finds out the mailbox type for an actor based on configuration, props and requirements.
*/
protected[akka] def getMailboxType(actor: Cell, mailboxType: MailboxType, mailboxTypeConfigured: Boolean): MailboxType =
actor.props.mailbox.flatMap(id actor.system.mailboxes.lookup(id)) match {
case Some(x) x
case None if mailboxTypeConfigured mailboxType
case None actor.system.mailboxes.getRequiredType(actor.props.actorClass).
flatMap(c actor.system.mailboxes.lookupByQueueType(c)).getOrElse(mailboxType)
}
protected[akka] def createMailbox(actor: Cell, mailboxType: MailboxType): Mailbox
/**
* Identifier of this dispatcher, corresponds to the full key
@ -156,8 +149,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
}
override def reportFailure(t: Throwable): Unit = t match {
case e: LogEventException prerequisites.eventStream.publish(e.event)
case _ prerequisites.eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage))
case e: LogEventException eventStream.publish(e.event)
case _ eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage))
}
@tailrec
@ -321,37 +314,6 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
*/
def dispatcher(): MessageDispatcher
/**
* Returns a factory for the [[akka.dispatch.Mailbox]] given the configuration.
* Default implementation instantiate the [[akka.dispatch.MailboxType]] specified
* as FQCN in mailbox-type config property. If mailbox-type is unspecified (empty)
* then [[akka.dispatch.UnboundedMailbox]] is used when capacity is < 1,
* otherwise [[akka.dispatch.BoundedMailbox]].
*/
def mailboxType(): MailboxType = {
config.getString("mailbox-type") match {
case ""
if (config.getInt("mailbox-capacity") < 1) UnboundedMailbox()
else new BoundedMailbox(prerequisites.settings, config)
case "unbounded" UnboundedMailbox()
case "bounded" new BoundedMailbox(prerequisites.settings, config)
case fqcn
val args = List(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config)
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({
case exception
throw new IllegalArgumentException(
("Cannot instantiate MailboxType [%s], defined in [%s], " +
"make sure it has constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters")
.format(fqcn, config.getString("id")), exception)
}).get
}
}
/**
* Was the mailbox type configured or derived?
*/
def mailBoxTypeConfigured: Boolean = config.getString("mailbox-type") != Deploy.NoMailboxGiven
def configureExecutor(): ExecutorServiceConfigurator = {
config.getString("executor") match {
case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)

View file

@ -29,16 +29,15 @@ import scala.concurrent.duration.FiniteDuration
* @see akka.dispatch.Dispatchers
*/
class BalancingDispatcher(
_prerequisites: DispatcherPrerequisites,
_configurator: MessageDispatcherConfigurator,
_id: String,
throughput: Int,
throughputDeadlineTime: Duration,
mailboxType: MailboxType,
_mailBoxTypeConfigured: Boolean,
_mailboxType: MailboxType,
_executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
_shutdownTimeout: FiniteDuration,
attemptTeamWork: Boolean)
extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _mailBoxTypeConfigured, _executorServiceFactoryProvider, _shutdownTimeout) {
extends Dispatcher(_configurator, _id, throughput, throughputDeadlineTime, _executorServiceFactoryProvider, _shutdownTimeout) {
/**
* INTERNAL API
@ -51,7 +50,7 @@ class BalancingDispatcher(
/**
* INTERNAL API
*/
private[akka] val messageQueue: MessageQueue = mailboxType.create(None, None)
private[akka] val messageQueue: MessageQueue = _mailboxType.create(None, None)
private class SharingMailbox(val system: ActorSystemImpl, _messageQueue: MessageQueue)
extends Mailbox(_messageQueue) with DefaultSystemMessageQueue {
@ -69,7 +68,8 @@ class BalancingDispatcher(
}
}
protected[akka] override def createMailbox(actor: akka.actor.Cell): Mailbox = new SharingMailbox(actor.systemImpl, messageQueue)
protected[akka] override def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox =
new SharingMailbox(actor.systemImpl, messageQueue)
protected[akka] override def register(actor: ActorCell): Unit = {
super.register(actor)

View file

@ -30,15 +30,15 @@ import java.lang.reflect.ParameterizedType
* Larger values (or zero or negative) increase throughput, smaller values increase fairness
*/
class Dispatcher(
_prerequisites: DispatcherPrerequisites,
_configurator: MessageDispatcherConfigurator,
val id: String,
val throughput: Int,
val throughputDeadlineTime: Duration,
val mailboxType: MailboxType,
val mailboxTypeConfigured: Boolean,
executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
val shutdownTimeout: FiniteDuration)
extends MessageDispatcher(_prerequisites) {
extends MessageDispatcher(_configurator) {
import configurator.prerequisites._
private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate {
lazy val executor: ExecutorService = factory.createExecutorService
@ -46,7 +46,7 @@ class Dispatcher(
}
@volatile private var executorServiceDelegate: LazyExecutorServiceDelegate =
new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory))
new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, threadFactory))
protected final def executorService: ExecutorServiceDelegate = executorServiceDelegate
@ -80,7 +80,7 @@ class Dispatcher(
executorService execute invocation
} catch {
case e2: RejectedExecutionException
prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!"))
eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!"))
throw e2
}
}
@ -89,8 +89,8 @@ class Dispatcher(
/**
* INTERNAL API
*/
protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox = {
new Mailbox(getMailboxType(actor, mailboxType, mailboxTypeConfigured).create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue
protected[akka] def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox = {
new Mailbox(mailboxType.create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue
}
/**
@ -125,7 +125,7 @@ class Dispatcher(
} catch { //Retry once
case e: RejectedExecutionException
mbox.setAsIdle()
prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
throw e
}
}

View file

@ -11,6 +11,7 @@ import akka.event.Logging.Warning
import akka.event.EventStream
import scala.concurrent.duration.Duration
import akka.ConfigurationException
import akka.actor.Deploy
/**
* DispatcherPrerequisites represents useful contextual pieces when constructing a MessageDispatcher
@ -22,6 +23,7 @@ trait DispatcherPrerequisites {
def scheduler: Scheduler
def dynamicAccess: DynamicAccess
def settings: ActorSystem.Settings
def mailboxes: Mailboxes
}
/**
@ -33,7 +35,8 @@ private[akka] case class DefaultDispatcherPrerequisites(
val deadLetterMailbox: Mailbox,
val scheduler: Scheduler,
val dynamicAccess: DynamicAccess,
val settings: ActorSystem.Settings) extends DispatcherPrerequisites
val settings: ActorSystem.Settings,
val mailboxes: Mailboxes) extends DispatcherPrerequisites
object Dispatchers {
/**
@ -101,7 +104,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
}
//INTERNAL API
private def config(id: String): Config = {
private[akka] def config(id: String): Config = {
import scala.collection.JavaConverters._
def simpleName = id.substring(id.lastIndexOf('.') + 1)
idConfig(id)
@ -172,12 +175,10 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance = new Dispatcher(
prerequisites,
this,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
mailBoxTypeConfigured,
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))
@ -187,20 +188,51 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi
override def dispatcher(): MessageDispatcher = instance
}
object BalancingDispatcherConfigurator {
private val defaultRequirement =
ConfigFactory.parseString("mailbox-requirement = akka.dispatch.MultipleConsumerSemantics")
def amendConfig(config: Config): Config =
if (config.getString("mailbox-requirement") != "") config
else defaultRequirement.withFallback(config)
}
/**
* Configurator for creating [[akka.dispatch.BalancingDispatcher]].
* Returns the same dispatcher instance for for each invocation
* of the `dispatcher()` method.
*/
class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
class BalancingDispatcherConfigurator(_config: Config, _prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(BalancingDispatcherConfigurator.amendConfig(_config), _prerequisites) {
private val instance = new BalancingDispatcher(
prerequisites,
private val instance = {
val mailboxes = prerequisites.mailboxes
val id = config.getString("id")
val requirement = mailboxes.getMailboxRequirement(config)
if (!classOf[MultipleConsumerSemantics].isAssignableFrom(requirement))
throw new IllegalArgumentException(
"BalancingDispatcher must have 'mailbox-requirement' which implements akka.dispatch.MultipleConsumerSemantics; " +
s"dispatcher [$id] has [$requirement]")
val conf = config.withFallback(prerequisites.settings.config.getConfig(Mailboxes.DefaultMailboxId))
val mailboxType =
if (conf.getString("mailbox-type") != Deploy.NoMailboxGiven) mailboxes.lookupByQueueType(requirement)
else {
val mt = mailboxes.lookup(conf.getString("mailbox-type"))
if (!requirement.isAssignableFrom(mailboxes.getProducedMessageQueueType(mt)))
throw new IllegalArgumentException(
s"BalancingDispatcher [$id] has 'mailbox-type' [${mt.getClass}] which is incompatible with 'mailbox-requirement' [$requirement]")
mt
}
create(mailboxType)
}
protected def create(mailboxType: MailboxType): BalancingDispatcher =
new BalancingDispatcher(
this,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType, mailBoxTypeConfigured, configureExecutor(),
mailboxType,
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS),
config.getBoolean("attempt-teamwork"))
@ -233,7 +265,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer
*/
override def dispatcher(): MessageDispatcher =
new PinnedDispatcher(
prerequisites, null, config.getString("id"), mailboxType, mailBoxTypeConfigured,
this, null, config.getString("id"),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), threadPoolConfig)
}

View file

@ -347,7 +347,7 @@ trait MessageQueue {
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit
}
class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue {
class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue with UnboundedMessageQueueSemantics {
final def enqueue(receiver: ActorRef, handle: Envelope): Unit = add(handle)
@ -418,10 +418,16 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
}
/**
* This is a marker trait for message queues which support multiple consumers,
* as is required by the BalancingDispatcher.
*/
trait MultipleConsumerSemantics
/**
* A QueueBasedMessageQueue is a MessageQueue backed by a java.util.Queue.
*/
trait QueueBasedMessageQueue extends MessageQueue {
trait QueueBasedMessageQueue extends MessageQueue with MultipleConsumerSemantics {
def queue: Queue[Envelope]
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
@ -440,7 +446,9 @@ trait QueueBasedMessageQueue extends MessageQueue {
* UnboundedMessageQueueSemantics adds unbounded semantics to a QueueBasedMessageQueue,
* i.e. a non-blocking enqueue and dequeue.
*/
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
trait UnboundedMessageQueueSemantics
trait UnboundedQueueBasedMessageQueue extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def dequeue(): Envelope = queue.poll()
}
@ -449,8 +457,11 @@ trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
* BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue,
* i.e. blocking enqueue with timeout.
*/
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
trait BoundedMessageQueueSemantics {
def pushTimeOut: Duration
}
trait BoundedQueueBasedMessageQueue extends QueueBasedMessageQueue with BoundedMessageQueueSemantics {
override def queue: BlockingQueue[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
@ -466,16 +477,23 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
/**
* DequeBasedMessageQueue refines QueueBasedMessageQueue to be backed by a java.util.Deque.
*/
trait DequeBasedMessageQueue extends QueueBasedMessageQueue {
def queue: Deque[Envelope]
trait DequeBasedMessageQueueSemantics {
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit
}
trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueueSemantics with UnboundedMessageQueueSemantics
trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueueSemantics with BoundedMessageQueueSemantics
trait DequeBasedMessageQueue extends QueueBasedMessageQueue with DequeBasedMessageQueueSemantics {
def queue: Deque[Envelope]
}
/**
* UnboundedDequeBasedMessageQueueSemantics adds unbounded semantics to a DequeBasedMessageQueue,
* i.e. a non-blocking enqueue and dequeue.
*/
trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
trait UnboundedDequeBasedMessageQueue extends DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics {
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle
def dequeue(): Envelope = queue.poll()
@ -485,7 +503,7 @@ trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
* BoundedMessageQueueSemantics adds bounded semantics to a DequeBasedMessageQueue,
* i.e. blocking enqueue with timeout.
*/
trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
trait BoundedDequeBasedMessageQueue extends DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics {
def pushTimeOut: Duration
override def queue: BlockingDeque[Envelope]
@ -523,15 +541,21 @@ trait MailboxType {
def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue
}
trait ProducesMessageQueue[T <: MessageQueue]
/**
* UnboundedMailbox is the default unbounded MailboxType used by Akka Actors.
*/
case class UnboundedMailbox() extends MailboxType {
case class UnboundedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
new UnboundedMailbox.MessageQueue
}
object UnboundedMailbox {
class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {
final def queue: Queue[Envelope] = this
}
}
@ -541,17 +565,18 @@ case class UnboundedMailbox() extends MailboxType {
* the only drawback is that you can't have multiple consumers,
* which rules out using it with BalancingDispatcher for instance.
*/
case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType {
case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType with ProducesMessageQueue[NodeMessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue()
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue
}
/**
* BoundedMailbox is the default bounded MailboxType used by Akka Actors.
*/
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: FiniteDuration) extends MailboxType {
case class BoundedMailbox(val capacity: Int, val pushTimeOut: FiniteDuration)
extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
@ -560,9 +585,13 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Finit
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new LinkedBlockingQueue[Envelope](capacity) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
new BoundedMailbox.MessageQueue(capacity, pushTimeOut)
}
object BoundedMailbox {
class MessageQueue(capacity: Int, final val pushTimeOut: FiniteDuration)
extends LinkedBlockingQueue[Envelope](capacity) with BoundedQueueBasedMessageQueue {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
}
}
@ -570,10 +599,16 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Finit
* UnboundedPriorityMailbox is an unbounded mailbox that allows for prioritization of its contents.
* Extend this class and provide the Comparator in the constructor.
*/
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType {
class UnboundedPriorityMailbox(val cmp: Comparator[Envelope], val initialCapacity: Int)
extends MailboxType with ProducesMessageQueue[UnboundedPriorityMailbox.MessageQueue] {
def this(cmp: Comparator[Envelope]) = this(cmp, 11)
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new PriorityBlockingQueue[Envelope](initialCapacity, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
new UnboundedPriorityMailbox.MessageQueue(initialCapacity, cmp)
}
object UnboundedPriorityMailbox {
class MessageQueue(initialCapacity: Int, cmp: Comparator[Envelope])
extends PriorityBlockingQueue[Envelope](initialCapacity, cmp) with UnboundedQueueBasedMessageQueue {
final def queue: Queue[Envelope] = this
}
}
@ -582,27 +617,37 @@ class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val i
* BoundedPriorityMailbox is a bounded mailbox that allows for prioritization of its contents.
* Extend this class and provide the Comparator in the constructor.
*/
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue] {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
new BoundedPriorityMailbox.MessageQueue(capacity, cmp, pushTimeOut)
}
object BoundedPriorityMailbox {
class MessageQueue(capacity: Int, cmp: Comparator[Envelope], val pushTimeOut: Duration)
extends BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
with BoundedQueueBasedMessageQueue {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
}
}
/**
* UnboundedDequeBasedMailbox is an unbounded MailboxType, backed by a Deque.
*/
case class UnboundedDequeBasedMailbox() extends MailboxType {
case class UnboundedDequeBasedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedDequeBasedMailbox.MessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new LinkedBlockingDeque[Envelope]() with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics {
new UnboundedDequeBasedMailbox.MessageQueue
}
object UnboundedDequeBasedMailbox {
class MessageQueue extends LinkedBlockingDeque[Envelope] with UnboundedDequeBasedMessageQueue {
final val queue = this
}
}
@ -610,7 +655,8 @@ case class UnboundedDequeBasedMailbox() extends MailboxType {
/**
* BoundedDequeBasedMailbox is an bounded MailboxType, backed by a Deque.
*/
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: FiniteDuration) extends MailboxType {
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: FiniteDuration)
extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
@ -619,9 +665,13 @@ case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTime
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedDequeBasedMailbox can not be null")
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new LinkedBlockingDeque[Envelope](capacity) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics {
new BoundedDequeBasedMailbox.MessageQueue(capacity, pushTimeOut)
}
object BoundedDequeBasedMailbox {
class MessageQueue(capacity: Int, val pushTimeOut: FiniteDuration)
extends LinkedBlockingDeque[Envelope](capacity) with BoundedDequeBasedMessageQueue {
final val queue = this
final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut
}
}

View file

@ -12,13 +12,24 @@ import akka.event.Logging.Warning
import akka.ConfigurationException
import scala.annotation.tailrec
import java.lang.reflect.ParameterizedType
import akka.util.Reflect
import akka.actor.Props
import akka.actor.Deploy
import scala.util.Try
import scala.util.Failure
import scala.util.control.NonFatal
import akka.dispatch.DispatcherPrerequisites
object Mailboxes {
final val DefaultMailboxId = "akka.actor.default-mailbox"
}
private[akka] class Mailboxes(
val settings: ActorSystem.Settings,
val eventStream: EventStream,
dynamicAccess: DynamicAccess) {
private val mailboxTypeConfigurators = new ConcurrentHashMap[String, Option[MailboxTypeConfigurator]]
private val mailboxTypeConfigurators = new ConcurrentHashMap[String, MailboxType]
private val mailboxBindings: Map[Class[_ <: Any], String] = {
import scala.collection.JavaConverters._
@ -38,108 +49,148 @@ private[akka] class Mailboxes(
/**
* Returns a mailbox type as specified in configuration, based on the id, or if not defined None.
*/
def lookup(id: String): Option[MailboxType] = lookupConfigurator(id).map { _.mailboxType }
def lookup(id: String): MailboxType = lookupConfigurator(id)
/**
* Returns a mailbox type as specified in configuration, based on the type, or if not defined None.
*/
def lookupByQueueType(queueType: Class[_ <: Any]): Option[MailboxType] =
lookupId(queueType).flatMap(x lookup(x))
def lookupByQueueType(queueType: Class[_ <: Any]): MailboxType = lookup(lookupId(queueType))
private final val rmqClass = classOf[RequiresMessageQueue[_]]
/**
* Return the required message queue type for this class if any.
*/
def getRequiredType(actorClass: Class[_ <: Actor]): Option[Class[_]] = {
@tailrec
def innerRequiredType(classes: Iterator[Class[_]]): Option[Class[_]] = {
if (classes.isEmpty) None
else {
val c = classes.next()
if (rmqClass.isAssignableFrom(c)) {
val ifaces = c.getGenericInterfaces
val tpe = ifaces.collectFirst {
case t: ParameterizedType if rmqClass.isAssignableFrom(t.getRawType.asInstanceOf[Class[_]])
t.getActualTypeArguments.head.asInstanceOf[Class[_]]
}
if (tpe.isDefined) tpe
else innerRequiredType(classes ++ ifaces.map {
def getRequiredType(actorClass: Class[_ <: Actor]): Class[_] =
Reflect.findMarker(actorClass, rmqClass) match {
case t: ParameterizedType t.getActualTypeArguments.head match {
case c: Class[_] c
case c: ParameterizedType c.getRawType.asInstanceOf[Class[_]]
} ++ Iterator(c.getSuperclass))
case x throw new IllegalArgumentException(s"no wildcard type allowed in RequireMessageQueue argument (was [$x])")
}
}
// dont care if this happens twice
private var mailboxSizeWarningIssued = false
def getMailboxRequirement(config: Config) = config.getString("mailbox-requirement") match {
case "" classOf[MessageQueue]
case x dynamicAccess.getClassFor[AnyRef](x).get
}
def getProducedMessageQueueType(mailboxType: MailboxType): Class[_] = {
val pmqClass = classOf[ProducesMessageQueue[_]]
if (!pmqClass.isAssignableFrom(mailboxType.getClass)) classOf[MessageQueue]
else Reflect.findMarker(mailboxType.getClass, pmqClass) match {
case t: ParameterizedType
t.getActualTypeArguments.head match {
case c: Class[_] c
case x throw new IllegalArgumentException(
s"no wildcard type allowed in ProducesMessageQueue argument (was [$x])")
}
}
}
/**
* Finds out the mailbox type for an actor based on configuration, props and requirements.
*/
protected[akka] def getMailboxType(props: Props, dispatcherConfig: Config): MailboxType = {
val id = dispatcherConfig.getString("id")
val deploy = props.deploy
val actorClass = props.actorClass
lazy val actorRequirement = getRequiredType(actorClass)
val mailboxRequirement: Class[_] = getMailboxRequirement(dispatcherConfig)
val hasMailboxRequirement: Boolean = mailboxRequirement != classOf[MessageQueue]
val hasMailboxType =
dispatcherConfig.hasPath("mailbox-type") &&
dispatcherConfig.getString("mailbox-type") != Deploy.NoMailboxGiven
// TODO remove in 2.3
if (!hasMailboxType && !mailboxSizeWarningIssued && dispatcherConfig.hasPath("mailbox-size")) {
eventStream.publish(Warning("mailboxes", getClass,
"ignoring setting 'mailbox-size' for dispatcher [$id], you need to specify 'mailbox-type=bounded'"))
mailboxSizeWarningIssued = true
}
def verifyRequirements(mailboxType: MailboxType): MailboxType = {
lazy val mqType: Class[_] = getProducedMessageQueueType(mailboxType)
if (hasMailboxRequirement && !mailboxRequirement.isAssignableFrom(mqType))
throw new IllegalArgumentException(
s"produced message queue type [$mqType] does not fulfill requirement for dispatcher [${id}]")
if (hasRequiredType(actorClass) && !actorRequirement.isAssignableFrom(mqType))
throw new IllegalArgumentException(
s"produced message queue type [$mqType] does not fulfill requirement for actor class [$actorClass]")
mailboxType
}
if (deploy.mailbox != Deploy.NoMailboxGiven) {
verifyRequirements(lookup(deploy.mailbox))
} else if (deploy.dispatcher != Deploy.NoDispatcherGiven && hasMailboxType) {
verifyRequirements(lookup(dispatcherConfig.getString("id")))
} else if (hasRequiredType(actorClass)) {
try verifyRequirements(lookupByQueueType(getRequiredType(actorClass)))
catch {
case NonFatal(thr) if (hasMailboxRequirement) verifyRequirements(lookupByQueueType(mailboxRequirement))
}
} else if (hasMailboxRequirement) {
verifyRequirements(lookupByQueueType(mailboxRequirement))
} else {
innerRequiredType(classes)
verifyRequirements(lookup(Mailboxes.DefaultMailboxId))
}
}
}
if (rmqClass.isAssignableFrom(actorClass)) innerRequiredType(Iterator(actorClass))
else None
}
/**
* Check if this class can have a required message queue type.
*/
def hasRequiredType(actorClass: Class[_ <: Actor]): Boolean = rmqClass.isAssignableFrom(actorClass)
private def lookupId(queueType: Class[_]): Option[String] = {
private def lookupId(queueType: Class[_]): String =
mailboxBindings.get(queueType) match {
case None
eventStream.publish(Warning("Mailboxes", this.getClass, s"Mailbox Mapping for [${queueType}] not configured"))
None
case s s
}
case None throw new IllegalArgumentException(s"Mailbox Mapping for [${queueType}] not configured")
case Some(s) s
}
private def lookupConfigurator(id: String): Option[MailboxTypeConfigurator] = {
private def lookupConfigurator(id: String): MailboxType = {
mailboxTypeConfigurators.get(id) match {
case null
// It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup.
val newConfigurator =
if (settings.config.hasPath(id)) {
Some(new MailboxTypeConfigurator(settings, config(id), dynamicAccess))
} else {
eventStream.publish(Warning("Mailboxes", this.getClass, s"Mailbox Type [${id}] not configured"))
None
}
if (newConfigurator.isDefined) {
mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match {
case null newConfigurator
case existing existing
}
} else None
case existing existing
}
}
//INTERNAL API
private def config(id: String): Config = {
import scala.collection.JavaConverters._
ConfigFactory.parseMap(Map("id" -> id).asJava)
.withFallback(settings.config.getConfig(id))
}
}
private[akka] class MailboxTypeConfigurator(
val settings: ActorSystem.Settings,
val config: Config,
dynamicAccess: DynamicAccess) {
private val instance: MailboxType = {
val id = config.getString("id")
config.getString("mailbox-type") match {
case "" throw new IllegalArgumentException(s"The setting mailbox-type, defined in [${id}}] is empty")
val newConfigurator = id match {
case "unbounded" UnboundedMailbox()
case "bounded" new BoundedMailbox(settings, config(id))
case _
if (!settings.config.hasPath(id)) throw new IllegalArgumentException(s"Mailbox Type [${id}] not configured")
val conf = config(id)
conf.getString("mailbox-type") match {
case "" throw new IllegalArgumentException(s"The setting mailbox-type, defined in [$id] is empty")
case fqcn
val args = List(classOf[ActorSystem.Settings] -> settings, classOf[Config] -> config)
val args = List(classOf[ActorSystem.Settings] -> settings, classOf[Config] -> conf)
dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({
case exception
throw new IllegalArgumentException(
(s"Cannot instantiate MailboxType [${fqcn}], defined in [${id}], make sure it has a public" +
(s"Cannot instantiate MailboxType [$fqcn], defined in [$id], make sure it has a public" +
" constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters"),
exception)
}).get
}
}
def mailboxType: MailboxType = instance
mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match {
case null newConfigurator
case existing existing
}
case existing existing
}
}
private val defaultMailboxConfig = settings.config.getConfig(Mailboxes.DefaultMailboxId)
//INTERNAL API
private def config(id: String): Config = {
import scala.collection.JavaConverters._
ConfigFactory.parseMap(Map("id" -> id).asJava)
.withFallback(settings.config.getConfig(id))
.withFallback(defaultMailboxConfig)
}
}

View file

@ -15,19 +15,15 @@ import scala.concurrent.duration.FiniteDuration
* the `lookup` method in [[akka.dispatch.Dispatchers]].
*/
class PinnedDispatcher(
_prerequisites: DispatcherPrerequisites,
_configurator: MessageDispatcherConfigurator,
_actor: ActorCell,
_id: String,
_mailboxType: MailboxType,
_mailboxTypeConfigured: Boolean,
_shutdownTimeout: FiniteDuration,
_threadPoolConfig: ThreadPoolConfig)
extends Dispatcher(_prerequisites,
extends Dispatcher(_configurator,
_id,
Int.MaxValue,
Duration.Zero,
_mailboxType,
_mailboxTypeConfigured,
_threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1),
_shutdownTimeout) {

View file

@ -5,7 +5,6 @@ package akka.routing
import language.implicitConversions
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor._
@ -20,22 +19,33 @@ import akka.event.Logging.Warning
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.annotation.tailrec
import akka.event.Logging.Warning
import akka.dispatch.{ MailboxType, MessageDispatcher }
import akka.dispatch.BalancingDispatcher
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
* send a message to one (or more) of these actors.
*/
private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath)
extends RepointableActorRef(_system, _props, _supervisor, _path) {
private[akka] class RoutedActorRef(
_system: ActorSystemImpl,
_routerProps: Props,
_routerDispatcher: MessageDispatcher,
_routerMailbox: MailboxType,
_routeeProps: Props,
_supervisor: InternalActorRef,
_path: ActorPath)
extends RepointableActorRef(_system, _routerProps, _routerDispatcher, _routerMailbox, _supervisor, _path) {
// verify that a BalancingDispatcher is not used with a Router
if (_props.routerConfig != NoRouter && _system.dispatchers.isBalancingDispatcher(_props.routerConfig.routerDispatcher)) {
if (_routerProps.routerConfig != NoRouter && _routerDispatcher.isInstanceOf[BalancingDispatcher]) {
throw new ConfigurationException(
"Configuration for " + this +
" is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.")
} else _props.routerConfig.verifyConfig(_path)
} else _routerProps.routerConfig.verifyConfig(_path)
override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor).init(sendSupervise = false)
override def newCell(old: UnstartedCell): Cell =
new RoutedActorCell(system, this, props, dispatcher, _routeeProps, supervisor)
.init(sendSupervise = false, mailboxType)
}
@ -46,16 +56,16 @@ private[akka] object RoutedActorCell {
}
}
private[akka] final class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActorRef, _props: Props, _supervisor: InternalActorRef)
extends ActorCell(
_system,
_ref,
_props.copy(
deploy = _props.deploy.copy(dispatcher = _props.routerConfig.routerDispatcher),
classOf[RoutedActorCell.RouterCreator], Vector(_props.routerConfig)),
_supervisor) {
private[akka] final class RoutedActorCell(
_system: ActorSystemImpl,
_ref: InternalActorRef,
_routerProps: Props,
_routerDispatcher: MessageDispatcher,
val routeeProps: Props,
_supervisor: InternalActorRef)
extends ActorCell(_system, _ref, _routerProps, _routerDispatcher, _supervisor) {
private[akka] val routerConfig = _props.routerConfig
private[akka] val routerConfig = _routerProps.routerConfig
private[akka] val resizeInProgress = new AtomicBoolean
private val resizeCounter = new AtomicLong
@ -72,7 +82,6 @@ private[akka] final class RoutedActorCell(_system: ActorSystemImpl, _ref: Intern
def route = _route
private def startRoute() {
val routeeProps = _props.withRouter(NoRouter)
_routeeProvider = routerConfig.createRouteeProvider(this, routeeProps)
val r = routerConfig.createRoute(routeeProvider)
// initial resize, before message send

View file

@ -5,6 +5,9 @@ package akka.util
import scala.util.control.NonFatal
import java.lang.reflect.Constructor
import scala.collection.immutable
import java.lang.reflect.Type
import scala.annotation.tailrec
import java.lang.reflect.ParameterizedType
/**
* Collection of internal reflection utilities which may or may not be
@ -95,4 +98,20 @@ private[akka] object Reflect {
* @return a function which when applied will create a new instance from the default constructor of the given class
*/
private[akka] def instantiator[T](clazz: Class[T]): () T = () instantiate(clazz)
def findMarker(root: Class[_], marker: Class[_]): Type = {
@tailrec def rec(curr: Class[_]): Type = {
if (curr.getSuperclass != null && marker.isAssignableFrom(curr.getSuperclass)) rec(curr.getSuperclass)
else curr.getGenericInterfaces collectFirst {
case c: Class[_] if marker isAssignableFrom c c
case t: ParameterizedType if marker isAssignableFrom t.getRawType.asInstanceOf[Class[_]] t
} match {
case None throw new IllegalArgumentException("cannot find [$marker] in ancestors of [$root]")
case Some(c: Class[_]) if (c == marker) c else rec(c)
case Some(t: ParameterizedType) if (t.getRawType == marker) t else rec(t.getRawType.asInstanceOf[Class[_]])
case _ ??? // cannot happen due to collectFirst
}
}
rec(root)
}
}

View file

@ -8,7 +8,7 @@ import java.util.concurrent.{ ConcurrentHashMap, ConcurrentLinkedQueue }
import com.typesafe.config.Config
import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, QueueBasedMessageQueue, UnboundedMessageQueueSemantics }
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedQueueBasedMessageQueue }
object PeekMailboxExtension extends ExtensionId[PeekMailboxExtension] with ExtensionIdProvider {
def lookup = this
@ -53,7 +53,7 @@ class PeekMailboxType(settings: ActorSystem.Settings, config: Config) extends Ma
}
class PeekMailbox(owner: ActorRef, system: ActorSystem, maxRetries: Int)
extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
extends UnboundedQueueBasedMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
/*

View file

@ -1,14 +1,96 @@
.. _mailboxes-java:
Mailboxes
=========
#########
An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``.
Normally each ``Actor`` has its own mailbox, but with for example a ``BalancingDispatcher``
all actors with the same ``BalancingDispatcher`` will share a single instance.
Mailbox Selection
=================
Requiring a Message Queue Type for an Actor
-------------------------------------------
It is possible to require a certain type of message queue for a certain type of actor
by having that actor implement the parameterized interface :class:`RequiresMessageQueue`. Here is
an example:
.. includecode:: code/docs/actor/MyBoundedUntypedActor.java#my-bounded-untyped-actor
The type parameter to the :class:`RequiresMessageQueue` interface needs to be mapped to a mailbox in
configuration like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala
:include: bounded-mailbox-config,required-mailbox-config
Now every time you create an actor of type :class:`MyBoundedUntypedActor` it will try to get a bounded
mailbox. If the actor has a different mailbox configured in deployment, either directly or via
a dispatcher with a specified mailbox type, then that will override this mapping.
.. note::
The type of the queue in the mailbox created for an actor will be checked against the required type in the
interface and if the queue doesn't implement the required type then actor creation will fail.
Requiring a Message Queue Type for a Dispatcher
-----------------------------------------------
A dispatcher may also have a requirement for the mailbox type used by the
actors running on it. An example is the BalancingDispatcher which requires a
message queue that is thread-safe for multiple concurrent consumers. Such a
requirement is formulated within the dispatcher configuration section like
this::
my-dispatcher {
mailbox-requirement = org.example.MyInterface
}
The given requirement names a class or interface which will then be ensured to
be a supertype of the message queues implementation. In case of a
conflict—e.g. if the actor requires a mailbox type which does not satisfy this
requirement—then actor creation will fail.
How the Mailbox Type is Selected
--------------------------------
When an actor is created, the :class:`ActorRefProvider` first determines the
dispatcher which will execute it. Then the mailbox is determined as follows:
1. If the actors deployment configuration section contains a ``mailbox`` key
then that names a configuration section describing the mailbox type to be
used.
2. If the actors ``Props`` contains a mailbox selection—i.e. ``withMailbox``
was called on it—then that names a configuration section describing the
mailbox type to be used.
3. If the dispatchers configuration section contains a ``mailbox-type`` key
the same section will be used to configure the mailbox type.
4. If the actor requires a mailbox type as described above then the mapping for
that requirement will be used to determine the mailbox type to be used; if
that fails then the dispatchers requirement—if any—will be tried instead.
5. If the dispatcher requires a mailbox type as described above then the
mapping for that requirement will be used to determine the mailbox type to
be used.
6. The default mailbox ``akka.actor.default-mailbox`` will be used.
Which Configuration is pass to the Mailbox Type
-----------------------------------------------
Each mailbox type is implemented by a class which extends :class:`MailboxType`
and takes two constructor arguments: a :class:`ActorSystem.Settings` object and
a :class:`Config` section. The latter is computed by obtaining the named
configuration section from the actor systems configuration, overriding its
``id`` key with the configuration path of the mailbox type and adding a
fall-back to the default mailbox configuration section.
Builtin Mailbox Implementations
-------------------------------
===============================
Akka comes shipped with a number of default mailbox implementations:
@ -47,7 +129,7 @@ Akka comes shipped with a number of default mailbox implementations:
* Durable mailboxes, see :ref:`durable-mailboxes-java`.
Mailbox configuration examples
------------------------------
==============================
How to create a PriorityMailbox:
@ -75,44 +157,8 @@ Or code like this:
.. includecode:: code/docs/dispatcher/DispatcherDocTest.java#defining-mailbox-in-code
Requiring a message queue type for an Actor
-------------------------------------------
It is possible to require a certain type of message queue for a certain type of actor
by having that actor implement the parameterized interface :class:`RequiresMessageQueue`. Here is
an example:
.. includecode:: code/docs/actor/MyBoundedUntypedActor.java#my-bounded-untyped-actor
The type parameter to the :class:`RequiresMessageQueue` interface needs to be mapped to a mailbox in
configuration like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala
:include: bounded-mailbox-config,required-mailbox-config
Now every time you create an actor of type :class:`MyBoundedUntypedActor` it will try to get a bounded
mailbox. If the actor has a different mailbox configured in deployment, either directly or via
a dispatcher with a specified mailbox type, then that will override this mapping.
.. note::
The type of the queue in the mailbox created for an actor will be checked against the required type in the
interface and if the queue doesn't implement the required type then actor creation will fail.
Mailbox configuration precedence
--------------------------------
The order of precedence for the mailbox type of an actor, where lower numbers override higher, is:
1. Mailbox type configured in the deployment of the actor
2. Mailbox type configured on the dispatcher of the actor
3. Mailbox type configured on the Props of the actor
4. Mailbox type configured via message queue requirement
Creating your own Mailbox type
------------------------------
==============================
An example is worth a thousand quacks:
@ -135,7 +181,7 @@ configuration, or the mailbox configuration.
Special Semantics of ``system.actorOf``
---------------------------------------
=======================================
In order to make ``system.actorOf`` both synchronous and non-blocking while
keeping the return type :class:`ActorRef` (and the semantics that the returned

View file

@ -348,3 +348,20 @@ message. Therefore the following is now safe::
context.stop(target)
context.unwatch(target)
Dispatcher and Mailbox Implementation Changes
=============================================
This point is only relevant if you have implemented a custom mailbox or
dispatcher and want to migrate that to Akka 2.2. The constructor signature of
:class:`MessageDispatcher` has changed, it now takes a
:class:`MessageDispatcherConfigurator` instead of
:class:`DispatcherPrerequisites`. Its :class:`createMailbox` method now
receives one more argument of type :class:`MailboxType`, which is the mailbox
type determined by the :class:`ActorRefProvider` for the actor based on its
deployment. The :class:`DispatcherPrerequisites` now include a
:class:`Mailboxes` instance which can be used for resolving mailbox references.
The constructor signatures of the built-in dispatcher implementation have been
adapted accordingly. The traits describing mailbox semantics have been
separated from the implementation traits.

View file

@ -203,8 +203,7 @@ object DispatcherDocSpec {
import akka.dispatch.{
Envelope,
MessageQueue,
QueueBasedMessageQueue,
UnboundedMessageQueueSemantics
UnboundedQueueBasedMessageQueue
}
// This constructor signature must exist, it will be called by Akka
@ -213,7 +212,7 @@ object DispatcherDocSpec {
// The create method is called to create the MessageQueue
final override def create(owner: Option[ActorRef],
system: Option[ActorSystem]): MessageQueue =
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
new UnboundedQueueBasedMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
}

View file

@ -40,7 +40,7 @@ class RouterViaProgramDocSpec extends AkkaSpec with ImplicitSender {
val actor3 = system.actorOf(Props[ExampleActor1], "actor3")
val routees = Vector[String]("/user/actor1", "/user/actor2", "/user/actor3")
val router = system.actorOf(
Props().withRouter(RoundRobinRouter(routees = routees)))
Props.empty.withRouter(RoundRobinRouter(routees = routees)))
//#programmaticRoutingRouteePaths
1 to 6 foreach { i router ! Message1(i) }
val received = receiveN(6, 5.seconds.dilated)

View file

@ -1,14 +1,96 @@
.. _mailboxes-scala:
Mailboxes
=========
#########
An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``.
Normally each ``Actor`` has its own mailbox, but with for example a ``BalancingDispatcher``
all actors with the same ``BalancingDispatcher`` will share a single instance.
Mailbox Selection
=================
Requiring a Message Queue Type for an Actor
-------------------------------------------
It is possible to require a certain type of message queue for a certain type of actor
by having that actor extend the parameterized trait :class:`RequiresMessageQueue`. Here is
an example:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#required-mailbox-class
The type parameter to the :class:`RequiresMessageQueue` trait needs to be mapped to a mailbox in
configuration like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala
:include: bounded-mailbox-config,required-mailbox-config
Now every time you create an actor of type :class:`MyBoundedActor` it will try to get a bounded
mailbox. If the actor has a different mailbox configured in deployment, either directly or via
a dispatcher with a specified mailbox type, then that will override this mapping.
.. note::
The type of the queue in the mailbox created for an actor will be checked against the required type in the
trait and if the queue doesn't implement the required type then actor creation will fail.
Requiring a Message Queue Type for a Dispatcher
-----------------------------------------------
A dispatcher may also have a requirement for the mailbox type used by the
actors running on it. An example is the BalancingDispatcher which requires a
message queue that is thread-safe for multiple concurrent consumers. Such a
requirement is formulated within the dispatcher configuration section like
this::
my-dispatcher {
mailbox-requirement = org.example.MyInterface
}
The given requirement names a class or interface which will then be ensured to
be a supertype of the message queues implementation. In case of a
conflict—e.g. if the actor requires a mailbox type which does not satisfy this
requirement—then actor creation will fail.
How the Mailbox Type is Selected
--------------------------------
When an actor is created, the :class:`ActorRefProvider` first determines the
dispatcher which will execute it. Then the mailbox is determined as follows:
1. If the actors deployment configuration section contains a ``mailbox`` key
then that names a configuration section describing the mailbox type to be
used.
2. If the actors ``Props`` contains a mailbox selection—i.e. ``withMailbox``
was called on it—then that names a configuration section describing the
mailbox type to be used.
3. If the dispatchers configuration section contains a ``mailbox-type`` key
the same section will be used to configure the mailbox type.
4. If the actor requires a mailbox type as described above then the mapping for
that requirement will be used to determine the mailbox type to be used; if
that fails then the dispatchers requirement—if any—will be tried instead.
5. If the dispatcher requires a mailbox type as described above then the
mapping for that requirement will be used to determine the mailbox type to
be used.
6. The default mailbox ``akka.actor.default-mailbox`` will be used.
Which Configuration is pass to the Mailbox Type
-----------------------------------------------
Each mailbox type is implemented by a class which extends :class:`MailboxType`
and takes two constructor arguments: a :class:`ActorSystem.Settings` object and
a :class:`Config` section. The latter is computed by obtaining the named
configuration section from the actor systems configuration, overriding its
``id`` key with the configuration path of the mailbox type and adding a
fall-back to the default mailbox configuration section.
Builtin implementations
-----------------------
=======================
Akka comes shipped with a number of default mailbox implementations:
@ -47,7 +129,7 @@ Akka comes shipped with a number of default mailbox implementations:
* Durable mailboxes, see :ref:`durable-mailboxes-scala`.
Mailbox configuration examples
------------------------------
==============================
How to create a PriorityMailbox:
@ -75,44 +157,8 @@ Or code like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-mailbox-in-code
Requiring a message queue type for an Actor
-------------------------------------------
It is possible to require a certain type of message queue for a certain type of actor
by having that actor extend the parameterized trait :class:`RequiresMessageQueue`. Here is
an example:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#required-mailbox-class
The type parameter to the :class:`RequiresMessageQueue` trait needs to be mapped to a mailbox in
configuration like this:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala
:include: bounded-mailbox-config,required-mailbox-config
Now every time you create an actor of type :class:`MyBoundedActor` it will try to get a bounded
mailbox. If the actor has a different mailbox configured in deployment, either directly or via
a dispatcher with a specified mailbox type, then that will override this mapping.
.. note::
The type of the queue in the mailbox created for an actor will be checked against the required type in the
trait and if the queue doesn't implement the required type then actor creation will fail.
Mailbox configuration precedence
--------------------------------
The order of precedence for the mailbox type of an actor, where lower numbers override higher, is:
1. Mailbox type configured in the deployment of the actor
2. Mailbox type configured on the dispatcher of the actor
3. Mailbox type configured on the Props of the actor
4. Mailbox type configured via message queue requirement
Creating your own Mailbox type
------------------------------
==============================
An example is worth a thousand quacks:
@ -133,7 +179,7 @@ configuration, or the mailbox configuration.
Special Semantics of ``system.actorOf``
---------------------------------------
=======================================
In order to make ``system.actorOf`` both synchronous and non-blocking while
keeping the return type :class:`ActorRef` (and the semantics that the returned

View file

@ -272,6 +272,14 @@ private[akka] class RemoteActorRefProvider(
} else if (props.deploy.scope == LocalScope) {
throw new IllegalArgumentException(s"configuration requested remote deployment for local-only Props at [$path]")
} else try {
try {
// for consistency we check configuration of dispatcher and mailbox locally
val dispatcher = system.dispatchers.lookup(props.dispatcher)
system.mailboxes.getMailboxType(props, dispatcher.configurator.config)
} catch {
case NonFatal(e) throw new IllegalArgumentException(
s"configuration problem while creating [$path] with dispatcher [${props.dispatcher}] and mailbox [${props.mailbox}]", e)
}
val localAddress = transport.localAddressForRemote(addr)
val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements).
withUid(path.uid)

View file

@ -126,18 +126,16 @@ object CallingThreadDispatcher {
*
* @since 1.1
*/
class CallingThreadDispatcher(
_prerequisites: DispatcherPrerequisites,
val mailboxType: MailboxType,
val mailboxTypeConfigured: Boolean) extends MessageDispatcher(_prerequisites) {
class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) extends MessageDispatcher(_configurator) {
import CallingThreadDispatcher._
import configurator.prerequisites._
val log = akka.event.Logging(prerequisites.eventStream, "CallingThreadDispatcher")
val log = akka.event.Logging(eventStream, "CallingThreadDispatcher")
override def id: String = Id
protected[akka] override def createMailbox(actor: akka.actor.Cell) =
new CallingThreadMailbox(actor, getMailboxType(actor, mailboxType, mailboxTypeConfigured))
protected[akka] override def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType) =
new CallingThreadMailbox(actor, mailboxType)
protected[akka] override def shutdown() {}
@ -303,7 +301,7 @@ class CallingThreadDispatcher(
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance = new CallingThreadDispatcher(prerequisites, mailboxType(), mailBoxTypeConfigured)
private val instance = new CallingThreadDispatcher(this)
override def dispatcher(): MessageDispatcher = instance
}

View file

@ -20,11 +20,15 @@ import akka.pattern.ask
*/
class TestActorRef[T <: Actor](
_system: ActorSystem,
_prerequisites: DispatcherPrerequisites,
_props: Props,
_supervisor: ActorRef,
name: String)
extends {
val props =
_props.withDispatcher(
if (_props.deploy.dispatcher == Deploy.NoDispatcherGiven) CallingThreadDispatcher.Id
else _props.dispatcher)
val dispatcher = _system.dispatchers.lookup(props.dispatcher)
private val disregard = _supervisor match {
case l: LocalActorRef l.underlying.reserveChild(name)
case r: RepointableActorRef r.underlying match {
@ -36,9 +40,9 @@ class TestActorRef[T <: Actor](
}
} with LocalActorRef(
_system.asInstanceOf[ActorSystemImpl],
_props.withDispatcher(
if (_props.dispatcher == Dispatchers.DefaultDispatcherId) CallingThreadDispatcher.Id
else _props.dispatcher),
props,
dispatcher,
_system.mailboxes.getMailboxType(props, dispatcher.configurator.config),
_supervisor.asInstanceOf[InternalActorRef],
_supervisor.path / name) {
@ -47,8 +51,9 @@ class TestActorRef[T <: Actor](
import TestActorRef.InternalGetActor
protected override def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, supervisor) {
protected override def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props,
dispatcher: MessageDispatcher, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, dispatcher, supervisor) {
override def autoReceiveMessage(msg: Envelope) {
msg.message match {
case InternalGetActor sender ! actor
@ -132,7 +137,8 @@ object TestActorRef {
apply[T](props, system.asInstanceOf[ActorSystemImpl].guardian, name)
def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = {
new TestActorRef(system.asInstanceOf[ActorSystemImpl], system.dispatchers.prerequisites, props, supervisor.asInstanceOf[InternalActorRef], name)
val sysImpl = system.asInstanceOf[ActorSystemImpl]
new TestActorRef(sysImpl, props, supervisor.asInstanceOf[InternalActorRef], name)
}
def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName)

View file

@ -8,6 +8,8 @@ import akka.actor._
import scala.concurrent.duration.Duration
import akka.dispatch.DispatcherPrerequisites
import scala.concurrent.duration.FiniteDuration
import akka.dispatch.MessageDispatcher
import akka.dispatch.MailboxType
/**
* This is a specialised form of the TestActorRef with support for querying and
@ -35,11 +37,10 @@ import scala.concurrent.duration.FiniteDuration
*/
class TestFSMRef[S, D, T <: Actor](
system: ActorSystem,
_prerequisites: DispatcherPrerequisites,
props: Props,
supervisor: ActorRef,
name: String)(implicit ev: T <:< FSM[S, D])
extends TestActorRef(system, _prerequisites, props, supervisor, name) {
extends TestActorRef(system, props, supervisor, name) {
private def fsm: T = underlyingActor
@ -93,11 +94,11 @@ object TestFSMRef {
def apply[S, D, T <: Actor](factory: T)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
new TestFSMRef(impl, system.dispatchers.prerequisites, Props(creator = () factory), impl.guardian.asInstanceOf[InternalActorRef], TestActorRef.randomName)
new TestFSMRef(impl, Props(creator = () factory), impl.guardian.asInstanceOf[InternalActorRef], TestActorRef.randomName)
}
def apply[S, D, T <: Actor](factory: T, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
new TestFSMRef(impl, system.dispatchers.prerequisites, Props(creator = () factory), impl.guardian.asInstanceOf[InternalActorRef], name)
new TestFSMRef(impl, Props(creator = () factory), impl.guardian.asInstanceOf[InternalActorRef], name)
}
}