Select mailbox in Akka typed actors #27124

This commit is contained in:
Johan Andrén 2019-07-16 13:52:23 +02:00 committed by GitHub
parent d03294d359
commit 2626f17747
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 323 additions and 9 deletions

View file

@ -0,0 +1,14 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed;
public class MailboxSelectorTest {
// Compile time only test to verify
// mailbox factories are accessible from Java
private MailboxSelector def = MailboxSelector.defaultMailbox();
private MailboxSelector bounded = MailboxSelector.bounded(1000);
private MailboxSelector conf = MailboxSelector.fromConfig("somepath");
}

View file

@ -0,0 +1,48 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.typed;
import akka.Done;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.MailboxSelector;
import akka.actor.typed.javadsl.Behaviors;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
public class MailboxDocTest {
@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(ConfigFactory.load("mailbox-config-sample.conf"));
@Test
public void startSomeActorsWithDifferentMailboxes() {
TestProbe<Done> testProbe = testKit.createTestProbe();
Behavior<String> childBehavior = Behaviors.empty();
Behavior<Void> setup =
Behaviors.setup(
context -> {
// #select-mailbox
context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100));
context.spawn(
childBehavior,
"from-config-mailbox-child",
MailboxSelector.fromConfig("my-app.my-special-mailbox"));
// #select-mailbox
testProbe.ref().tell(Done.getInstance());
return Behaviors.stopped();
});
ActorRef<Void> ref = testKit.spawn(setup);
testProbe.receiveMessage();
}
}

View file

@ -0,0 +1,5 @@
my-app {
my-special-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
}

View file

@ -0,0 +1,97 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
import akka.actor.ActorCell
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.internal.adapter.ActorContextAdapter
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.dispatch.BoundedMessageQueueSemantics
import akka.dispatch.BoundedNodeMessageQueue
import akka.dispatch.MessageQueue
import akka.dispatch.UnboundedMessageQueueSemantics
import akka.testkit.EventFilter
import akka.testkit.TestLatch
import org.scalatest.WordSpecLike
import scala.concurrent.Await
import scala.concurrent.duration._
class MailboxSelectorSpec extends ScalaTestWithActorTestKit("""
specific-mailbox {
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 4
}
akka.loggers = [ akka.testkit.TestEventListener ]
""") with WordSpecLike {
// FIXME #24348: eventfilter support in typed testkit
import scaladsl.adapter._
implicit val untypedSystem = system.toUntyped
case class WhatsYourMailbox(replyTo: ActorRef[MessageQueue])
private def behavior: Behavior[WhatsYourMailbox] =
Behaviors.setup { context =>
Behaviors.receiveMessage[WhatsYourMailbox] {
case WhatsYourMailbox(replyTo) =>
val mailbox = context match {
case adapter: ActorContextAdapter[_] =>
adapter.untypedContext match {
case cell: ActorCell =>
cell.mailbox.messageQueue
}
}
replyTo ! mailbox
Behaviors.stopped
}
}
"The Mailbox selectors" must {
"default to unbounded" in {
val actor = spawn(behavior)
val mailbox = actor.ask(WhatsYourMailbox).futureValue
mailbox shouldBe a[UnboundedMessageQueueSemantics]
}
"select a bounded mailbox" in {
val actor = spawn(behavior, MailboxSelector.bounded(3))
val mailbox = actor.ask(WhatsYourMailbox).futureValue
mailbox shouldBe a[BoundedMessageQueueSemantics]
// capacity is private so only way to test is to fill mailbox
}
"set capacity on a bounded mailbox" in {
val latch = TestLatch(1)
val actor = spawn(Behaviors.receiveMessage[String] {
case "one" =>
// block here so we can fill mailbox up
Await.ready(latch, 10.seconds)
Behaviors.same
case _ =>
Behaviors.same
}, MailboxSelector.bounded(2))
actor ! "one" // actor will block here
actor ! "two"
EventFilter.warning(start = "received dead letter:", occurrences = 1).intercept {
// one or both of these doesn't fit in mailbox
// depending on race with how fast actor consumes
actor ! "three"
actor ! "four"
}
latch.open()
}
"select an arbitrary mailbox from config" in {
val actor = spawn(behavior, MailboxSelector.fromConfig("specific-mailbox"))
val mailbox = actor.ask(WhatsYourMailbox).futureValue
mailbox shouldBe a[BoundedMessageQueueSemantics]
mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4)
}
}
}

View file

@ -0,0 +1,40 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.typed
import akka.Done
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.Behavior
import akka.actor.typed.MailboxSelector
import akka.actor.typed.scaladsl.Behaviors
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
class MailboxDocSpec
extends ScalaTestWithActorTestKit(ConfigFactory.load("mailbox-config-sample.conf"))
with WordSpecLike {
"Specifying mailbox through props" must {
"work" in {
val probe = createTestProbe[Done]()
val childBehavior: Behavior[String] = Behaviors.empty
val parent: Behavior[Unit] = Behaviors.setup { context =>
// #select-mailbox
context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100))
val props = MailboxSelector.fromConfig("my-app.my-special-mailbox")
context.spawn(childBehavior, "from-config-mailbox-child", props)
// #select-mailbox
probe.ref ! Done
Behaviors.stopped
}
spawn(parent)
probe.receiveMessage()
}
}
}

View file

@ -179,3 +179,35 @@ object DispatcherSelector {
*/ */
def sameAsParent(): DispatcherSelector = DispatcherSameAsParent.empty def sameAsParent(): DispatcherSelector = DispatcherSameAsParent.empty
} }
/**
* Not for user extension.
*/
@DoNotInherit
abstract class MailboxSelector extends Props
object MailboxSelector {
/**
* Scala API: The default mailbox is unbounded and backed by a [[java.util.concurrent.ConcurrentLinkedQueue]]
*/
def default(): MailboxSelector = DefaultMailboxSelector.empty
/**
* Java API: The default mailbox is unbounded and backed by a [[java.util.concurrent.ConcurrentLinkedQueue]]
*/
def defaultMailbox(): MailboxSelector = default()
/**
* A mailbox with a max capacity after which new messages are dropped (passed to deadletters).
* @param capacity The maximum number of messages in the mailbox before new messages are dropped
*/
def bounded(capacity: Int): MailboxSelector = BoundedMailboxSelector(capacity)
/**
* Select a mailbox from the config file using an absolute config path.
*
* This is a power user settings default or bounded should be preferred unless you know what you are doing.
*/
def fromConfig(path: String): MailboxSelector = MailboxFromConfigSelector(path)
}

View file

@ -4,8 +4,7 @@
package akka.actor.typed.internal package akka.actor.typed.internal
import akka.actor.typed.DispatcherSelector import akka.actor.typed.{ DispatcherSelector, MailboxSelector, Props }
import akka.actor.typed.Props
import akka.annotation.InternalApi import akka.annotation.InternalApi
/** /**
@ -40,4 +39,19 @@ import akka.annotation.InternalApi
val empty = DispatcherSameAsParent(EmptyProps) val empty = DispatcherSameAsParent(EmptyProps)
} }
final case class DefaultMailboxSelector(next: Props = Props.empty) extends MailboxSelector {
def withNext(next: Props): Props = copy(next = next)
}
object DefaultMailboxSelector {
val empty = DefaultMailboxSelector(EmptyProps)
}
final case class BoundedMailboxSelector(capacity: Int, next: Props = Props.empty) extends MailboxSelector {
def withNext(next: Props): Props = copy(next = next)
}
final case class MailboxFromConfigSelector(path: String, next: Props = Props.empty) extends MailboxSelector {
def withNext(next: Props): Props = copy(next = next)
}
} }

View file

@ -7,9 +7,11 @@ package akka.actor.typed.internal.adapter
import akka.actor.Deploy import akka.actor.Deploy
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.DispatcherSelector import akka.actor.typed.DispatcherSelector
import akka.actor.typed.MailboxSelector
import akka.actor.typed.Props import akka.actor.typed.Props
import akka.actor.typed.internal.PropsImpl._ import akka.actor.typed.internal.PropsImpl._
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.dispatch.Mailboxes
/** /**
* INTERNAL API * INTERNAL API
@ -21,11 +23,22 @@ import akka.annotation.InternalApi
rethrowTypedFailure: Boolean = true): akka.actor.Props = { rethrowTypedFailure: Boolean = true): akka.actor.Props = {
val props = akka.actor.Props(new ActorAdapter(behavior(), rethrowTypedFailure)) val props = akka.actor.Props(new ActorAdapter(behavior(), rethrowTypedFailure))
(deploy.firstOrElse[DispatcherSelector](DispatcherDefault.empty) match { val p1 = (deploy.firstOrElse[DispatcherSelector](DispatcherDefault.empty) match {
case _: DispatcherDefault => props case _: DispatcherDefault => props
case DispatcherFromConfig(name, _) => props.withDispatcher(name) case DispatcherFromConfig(name, _) => props.withDispatcher(name)
case _: DispatcherSameAsParent => props.withDispatcher(Deploy.DispatcherSameAsParent) case _: DispatcherSameAsParent => props.withDispatcher(Deploy.DispatcherSameAsParent)
}).withDeploy(Deploy.local) // disallow remote deployment for typed actors }).withDeploy(Deploy.local) // disallow remote deployment for typed actors
val p2 = deploy.firstOrElse[MailboxSelector](MailboxSelector.default()) match {
case _: DefaultMailboxSelector => p1
case BoundedMailboxSelector(capacity, _) =>
// specific support in untyped Mailboxes
p1.withMailbox(s"${Mailboxes.BoundedCapacityPrefix}$capacity")
case MailboxFromConfigSelector(path, _) =>
props.withMailbox(path)
}
p2.withDeploy(Deploy.local) // disallow remote deployment for typed actors
} }
} }

View file

@ -6,6 +6,8 @@ package akka.dispatch
import java.lang.reflect.ParameterizedType import java.lang.reflect.ParameterizedType
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import akka.ConfigurationException import akka.ConfigurationException
import akka.actor.{ Actor, ActorRef, ActorSystem, DeadLetter, Deploy, DynamicAccess, Props } import akka.actor.{ Actor, ActorRef, ActorSystem, DeadLetter, Deploy, DynamicAccess, Props }
import akka.dispatch.sysmsg.{ import akka.dispatch.sysmsg.{
@ -18,13 +20,15 @@ import akka.event.EventStream
import akka.event.Logging.Warning import akka.event.Logging.Warning
import akka.util.Reflect import akka.util.Reflect
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.duration.Duration
object Mailboxes { object Mailboxes {
final val DefaultMailboxId = "akka.actor.default-mailbox" final val DefaultMailboxId = "akka.actor.default-mailbox"
final val NoMailboxRequirement = "" final val NoMailboxRequirement = ""
final val BoundedCapacityPrefix = "bounded-capacity:"
} }
private[akka] class Mailboxes( private[akka] class Mailboxes(
@ -204,6 +208,12 @@ private[akka] class Mailboxes(
// TODO RK remove these two for Akka 2.3 // TODO RK remove these two for Akka 2.3
case "unbounded" => UnboundedMailbox() case "unbounded" => UnboundedMailbox()
case "bounded" => new BoundedMailbox(settings, config(id)) case "bounded" => new BoundedMailbox(settings, config(id))
case _ if id.startsWith(BoundedCapacityPrefix) =>
// hack to allow programmatic set of capacity through props in akka-typed but still share
// mailbox configurators for the same size
val capacity = id.split(':')(1).toInt
new BoundedMailbox(capacity, Duration.Zero)
case _ => case _ =>
if (!settings.config.hasPath(id)) throw new ConfigurationException(s"Mailbox Type [$id] not configured") if (!settings.config.hasPath(id)) throw new ConfigurationException(s"Mailbox Type [$id] not configured")
val conf = config(id) val conf = config(id)

View file

@ -13,7 +13,7 @@ For the new API see @ref[dispatchers](typed/dispatchers.md).
## Dependency ## Dependency
Dispatchers are part of core akka, which means that they are part of the akka-actor dependency: Dispatchers are part of core Akka, which means that they are part of the akka-actor dependency:
@@dependency[sbt,Maven,Gradle] { @@dependency[sbt,Maven,Gradle] {
group="com.typesafe.akka" group="com.typesafe.akka"

View file

@ -532,7 +532,7 @@ trigger emails and other notifications immediately.
Markers are available through the LoggingAdapters, when obtained via `Logging.withMarker`. Markers are available through the LoggingAdapters, when obtained via `Logging.withMarker`.
The first argument passed into all log calls then should be a `akka.event.LogMarker`. The first argument passed into all log calls then should be a `akka.event.LogMarker`.
The slf4j bridge provided by akka in `akka-slf4j` will automatically pick up this marker value and make it available to SLF4J. The slf4j bridge provided by Akka in `akka-slf4j` will automatically pick up this marker value and make it available to SLF4J.
For example you could use it like this: For example you could use it like this:
``` ```

View file

@ -18,7 +18,7 @@ Akka persistence query complements @ref:[Persistence](persistence.md) by providi
query interface that various journal plugins can implement in order to expose their query capabilities. query interface that various journal plugins can implement in order to expose their query capabilities.
The most typical use case of persistence query is implementing the so-called query side (also known as "read side") The most typical use case of persistence query is implementing the so-called query side (also known as "read side")
in the popular CQRS architecture pattern - in which the writing side of the application (e.g. implemented using akka in the popular CQRS architecture pattern - in which the writing side of the application (e.g. implemented using Akka
persistence) is completely separated from the "query side". Akka Persistence Query itself is *not* directly the query persistence) is completely separated from the "query side". Akka Persistence Query itself is *not* directly the query
side of an application, however it can help to migrate data from the write side to the query side database. In very side of an application, however it can help to migrate data from the write side to the query side database. In very
simple scenarios Persistence Query may be powerful enough to fulfill the query needs of your app, however we highly simple scenarios Persistence Query may be powerful enough to fulfill the query needs of your app, however we highly

View file

@ -6,6 +6,7 @@
* [actors](actors.md) * [actors](actors.md)
* [dispatchers](dispatchers.md) * [dispatchers](dispatchers.md)
* [mailboxes](mailboxes.md)
* [coexisting](coexisting.md) * [coexisting](coexisting.md)
* [actor-lifecycle](actor-lifecycle.md) * [actor-lifecycle](actor-lifecycle.md)
* [interaction patterns](interaction-patterns.md) * [interaction patterns](interaction-patterns.md)

View file

@ -0,0 +1,40 @@
# Mailboxes
## Dependency
Mailboxes are part of core Akka, which means that they are part of the akka-actor-typed dependency:
@@dependency[sbt,Maven,Gradle] {
group="com.typesafe.akka"
artifact="akka-actor-typed_$scala.binary_version$"
version="$akka.version$"
}
## Introduction
Each actor in Akka has a `Mailbox`, this is where the messages are enqueued before being processed by the actor.
By default an unbounded mailbox is used, this means any number of messages can be enqueued into the mailbox.
The unbounded mailbox is a convenient default but in a scenario where messages are added to the mailbox faster
than the actor can process them, this can lead to the application running out of memory.
For this reason a bounded mailbox can be specified, the bounded mailbox will pass new messages to `deadletters` when the
mailbox is full.
For advanced use cases it is also possible to defer mailbox selection to config by pointing to a config path.
## Selecting what mailbox is used
To select mailbox for an actor use `MailboxSelector` to create a `Props` instance for spawning your actor:
Scala
: @@snip [MailboxDocSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MailboxDocSpec.scala) { #select-mailbox }
Java
: @@snip [MailboxDocTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MailboxDocTest.java) { #select-mailbox }
`fromConfig` takes an absolute config path to a block defining the dispatcher in the config file like this:
@@snip [MailboxDocSpec.scala](/akka-actor-typed-tests/src/test/resources/mailbox-config-sample.conf) { }
For more details on advanced mailbox config and custom mailbox implementations, see @ref[Classic Mailboxes](../mailboxes.md#builtin-mailbox-implementations).