diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/MailboxSelectorTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/MailboxSelectorTest.java new file mode 100644 index 0000000000..cfb0205d43 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/MailboxSelectorTest.java @@ -0,0 +1,14 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package akka.actor.typed; + +public class MailboxSelectorTest { + // Compile time only test to verify + // mailbox factories are accessible from Java + + private MailboxSelector def = MailboxSelector.defaultMailbox(); + private MailboxSelector bounded = MailboxSelector.bounded(1000); + private MailboxSelector conf = MailboxSelector.fromConfig("somepath"); +} diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MailboxDocTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MailboxDocTest.java new file mode 100644 index 0000000000..47b6ba77a2 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MailboxDocTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package jdocs.akka.typed; + +import akka.Done; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.MailboxSelector; +import akka.actor.typed.javadsl.Behaviors; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Test; + +public class MailboxDocTest { + + @ClassRule + public static final TestKitJunitResource testKit = + new TestKitJunitResource(ConfigFactory.load("mailbox-config-sample.conf")); + + @Test + public void startSomeActorsWithDifferentMailboxes() { + TestProbe testProbe = testKit.createTestProbe(); + Behavior childBehavior = Behaviors.empty(); + + Behavior setup = + Behaviors.setup( + context -> { + // #select-mailbox + context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100)); + + context.spawn( + childBehavior, + "from-config-mailbox-child", + MailboxSelector.fromConfig("my-app.my-special-mailbox")); + // #select-mailbox + + testProbe.ref().tell(Done.getInstance()); + return Behaviors.stopped(); + }); + + ActorRef ref = testKit.spawn(setup); + testProbe.receiveMessage(); + } +} diff --git a/akka-actor-typed-tests/src/test/resources/mailbox-config-sample.conf b/akka-actor-typed-tests/src/test/resources/mailbox-config-sample.conf new file mode 100644 index 0000000000..0a9594b650 --- /dev/null +++ b/akka-actor-typed-tests/src/test/resources/mailbox-config-sample.conf @@ -0,0 +1,5 @@ +my-app { + my-special-mailbox { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + } +} \ No newline at end of file diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/MailboxSelectorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/MailboxSelectorSpec.scala new file mode 100644 index 0000000000..d3ecaf2717 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/MailboxSelectorSpec.scala @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed + +import akka.actor.ActorCell +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.internal.adapter.ActorContextAdapter +import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.scaladsl.Behaviors +import akka.dispatch.BoundedMessageQueueSemantics +import akka.dispatch.BoundedNodeMessageQueue +import akka.dispatch.MessageQueue +import akka.dispatch.UnboundedMessageQueueSemantics +import akka.testkit.EventFilter +import akka.testkit.TestLatch +import org.scalatest.WordSpecLike + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class MailboxSelectorSpec extends ScalaTestWithActorTestKit(""" + specific-mailbox { + mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox" + mailbox-capacity = 4 + } + akka.loggers = [ akka.testkit.TestEventListener ] + """) with WordSpecLike { + + // FIXME #24348: eventfilter support in typed testkit + import scaladsl.adapter._ + implicit val untypedSystem = system.toUntyped + + case class WhatsYourMailbox(replyTo: ActorRef[MessageQueue]) + private def behavior: Behavior[WhatsYourMailbox] = + Behaviors.setup { context => + Behaviors.receiveMessage[WhatsYourMailbox] { + case WhatsYourMailbox(replyTo) => + val mailbox = context match { + case adapter: ActorContextAdapter[_] => + adapter.untypedContext match { + case cell: ActorCell => + cell.mailbox.messageQueue + } + } + + replyTo ! mailbox + Behaviors.stopped + } + } + + "The Mailbox selectors" must { + "default to unbounded" in { + val actor = spawn(behavior) + val mailbox = actor.ask(WhatsYourMailbox).futureValue + mailbox shouldBe a[UnboundedMessageQueueSemantics] + } + + "select a bounded mailbox" in { + val actor = spawn(behavior, MailboxSelector.bounded(3)) + val mailbox = actor.ask(WhatsYourMailbox).futureValue + mailbox shouldBe a[BoundedMessageQueueSemantics] + // capacity is private so only way to test is to fill mailbox + } + + "set capacity on a bounded mailbox" in { + val latch = TestLatch(1) + val actor = spawn(Behaviors.receiveMessage[String] { + case "one" => + // block here so we can fill mailbox up + Await.ready(latch, 10.seconds) + Behaviors.same + case _ => + Behaviors.same + }, MailboxSelector.bounded(2)) + actor ! "one" // actor will block here + actor ! "two" + EventFilter.warning(start = "received dead letter:", occurrences = 1).intercept { + // one or both of these doesn't fit in mailbox + // depending on race with how fast actor consumes + actor ! "three" + actor ! "four" + } + latch.open() + } + + "select an arbitrary mailbox from config" in { + val actor = spawn(behavior, MailboxSelector.fromConfig("specific-mailbox")) + val mailbox = actor.ask(WhatsYourMailbox).futureValue + mailbox shouldBe a[BoundedMessageQueueSemantics] + mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4) + + } + } + +} diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MailboxDocSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MailboxDocSpec.scala new file mode 100644 index 0000000000..7b87fc80e5 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MailboxDocSpec.scala @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package docs.akka.typed + +import akka.Done +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.Behavior +import akka.actor.typed.MailboxSelector +import akka.actor.typed.scaladsl.Behaviors +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +class MailboxDocSpec + extends ScalaTestWithActorTestKit(ConfigFactory.load("mailbox-config-sample.conf")) + with WordSpecLike { + + "Specifying mailbox through props" must { + "work" in { + val probe = createTestProbe[Done]() + val childBehavior: Behavior[String] = Behaviors.empty + val parent: Behavior[Unit] = Behaviors.setup { context => + // #select-mailbox + context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100)) + + val props = MailboxSelector.fromConfig("my-app.my-special-mailbox") + context.spawn(childBehavior, "from-config-mailbox-child", props) + // #select-mailbox + + probe.ref ! Done + Behaviors.stopped + } + spawn(parent) + + probe.receiveMessage() + } + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala index 493f1619f4..bcc23df2f0 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala @@ -179,3 +179,35 @@ object DispatcherSelector { */ def sameAsParent(): DispatcherSelector = DispatcherSameAsParent.empty } + +/** + * Not for user extension. + */ +@DoNotInherit +abstract class MailboxSelector extends Props + +object MailboxSelector { + + /** + * Scala API: The default mailbox is unbounded and backed by a [[java.util.concurrent.ConcurrentLinkedQueue]] + */ + def default(): MailboxSelector = DefaultMailboxSelector.empty + + /** + * Java API: The default mailbox is unbounded and backed by a [[java.util.concurrent.ConcurrentLinkedQueue]] + */ + def defaultMailbox(): MailboxSelector = default() + + /** + * A mailbox with a max capacity after which new messages are dropped (passed to deadletters). + * @param capacity The maximum number of messages in the mailbox before new messages are dropped + */ + def bounded(capacity: Int): MailboxSelector = BoundedMailboxSelector(capacity) + + /** + * Select a mailbox from the config file using an absolute config path. + * + * This is a power user settings default or bounded should be preferred unless you know what you are doing. + */ + def fromConfig(path: String): MailboxSelector = MailboxFromConfigSelector(path) +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PropsImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PropsImpl.scala index 9c19118769..6b4aa89577 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PropsImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PropsImpl.scala @@ -4,8 +4,7 @@ package akka.actor.typed.internal -import akka.actor.typed.DispatcherSelector -import akka.actor.typed.Props +import akka.actor.typed.{ DispatcherSelector, MailboxSelector, Props } import akka.annotation.InternalApi /** @@ -40,4 +39,19 @@ import akka.annotation.InternalApi val empty = DispatcherSameAsParent(EmptyProps) } + final case class DefaultMailboxSelector(next: Props = Props.empty) extends MailboxSelector { + def withNext(next: Props): Props = copy(next = next) + } + object DefaultMailboxSelector { + val empty = DefaultMailboxSelector(EmptyProps) + } + + final case class BoundedMailboxSelector(capacity: Int, next: Props = Props.empty) extends MailboxSelector { + def withNext(next: Props): Props = copy(next = next) + } + + final case class MailboxFromConfigSelector(path: String, next: Props = Props.empty) extends MailboxSelector { + def withNext(next: Props): Props = copy(next = next) + } + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala index 2ab0bc727f..eafc90bdff 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala @@ -7,9 +7,11 @@ package akka.actor.typed.internal.adapter import akka.actor.Deploy import akka.actor.typed.Behavior import akka.actor.typed.DispatcherSelector +import akka.actor.typed.MailboxSelector import akka.actor.typed.Props import akka.actor.typed.internal.PropsImpl._ import akka.annotation.InternalApi +import akka.dispatch.Mailboxes /** * INTERNAL API @@ -21,11 +23,22 @@ import akka.annotation.InternalApi rethrowTypedFailure: Boolean = true): akka.actor.Props = { val props = akka.actor.Props(new ActorAdapter(behavior(), rethrowTypedFailure)) - (deploy.firstOrElse[DispatcherSelector](DispatcherDefault.empty) match { + val p1 = (deploy.firstOrElse[DispatcherSelector](DispatcherDefault.empty) match { case _: DispatcherDefault => props case DispatcherFromConfig(name, _) => props.withDispatcher(name) case _: DispatcherSameAsParent => props.withDispatcher(Deploy.DispatcherSameAsParent) }).withDeploy(Deploy.local) // disallow remote deployment for typed actors + + val p2 = deploy.firstOrElse[MailboxSelector](MailboxSelector.default()) match { + case _: DefaultMailboxSelector => p1 + case BoundedMailboxSelector(capacity, _) => + // specific support in untyped Mailboxes + p1.withMailbox(s"${Mailboxes.BoundedCapacityPrefix}$capacity") + case MailboxFromConfigSelector(path, _) => + props.withMailbox(path) + } + + p2.withDeploy(Deploy.local) // disallow remote deployment for typed actors } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala index 2c25533a14..712d4221ba 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala @@ -6,6 +6,8 @@ package akka.dispatch import java.lang.reflect.ParameterizedType import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicReference + import akka.ConfigurationException import akka.actor.{ Actor, ActorRef, ActorSystem, DeadLetter, Deploy, DynamicAccess, Props } import akka.dispatch.sysmsg.{ @@ -18,13 +20,15 @@ import akka.event.EventStream import akka.event.Logging.Warning import akka.util.Reflect import com.typesafe.config.{ Config, ConfigFactory } + import scala.util.control.NonFatal -import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec +import scala.concurrent.duration.Duration object Mailboxes { final val DefaultMailboxId = "akka.actor.default-mailbox" final val NoMailboxRequirement = "" + final val BoundedCapacityPrefix = "bounded-capacity:" } private[akka] class Mailboxes( @@ -202,8 +206,14 @@ private[akka] class Mailboxes( // It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup. val newConfigurator = id match { // TODO RK remove these two for Akka 2.3 - case "unbounded" => UnboundedMailbox() - case "bounded" => new BoundedMailbox(settings, config(id)) + case "unbounded" => UnboundedMailbox() + case "bounded" => new BoundedMailbox(settings, config(id)) + case _ if id.startsWith(BoundedCapacityPrefix) => + // hack to allow programmatic set of capacity through props in akka-typed but still share + // mailbox configurators for the same size + val capacity = id.split(':')(1).toInt + new BoundedMailbox(capacity, Duration.Zero) + case _ => if (!settings.config.hasPath(id)) throw new ConfigurationException(s"Mailbox Type [$id] not configured") val conf = config(id) diff --git a/akka-docs/src/main/paradox/dispatchers.md b/akka-docs/src/main/paradox/dispatchers.md index 22e220955c..d87ab200e7 100644 --- a/akka-docs/src/main/paradox/dispatchers.md +++ b/akka-docs/src/main/paradox/dispatchers.md @@ -13,7 +13,7 @@ For the new API see @ref[dispatchers](typed/dispatchers.md). ## Dependency -Dispatchers are part of core akka, which means that they are part of the akka-actor dependency: +Dispatchers are part of core Akka, which means that they are part of the akka-actor dependency: @@dependency[sbt,Maven,Gradle] { group="com.typesafe.akka" diff --git a/akka-docs/src/main/paradox/logging.md b/akka-docs/src/main/paradox/logging.md index 472c64b39e..4aac52cab1 100644 --- a/akka-docs/src/main/paradox/logging.md +++ b/akka-docs/src/main/paradox/logging.md @@ -532,7 +532,7 @@ trigger emails and other notifications immediately. Markers are available through the LoggingAdapters, when obtained via `Logging.withMarker`. The first argument passed into all log calls then should be a `akka.event.LogMarker`. -The slf4j bridge provided by akka in `akka-slf4j` will automatically pick up this marker value and make it available to SLF4J. +The slf4j bridge provided by Akka in `akka-slf4j` will automatically pick up this marker value and make it available to SLF4J. For example you could use it like this: ``` diff --git a/akka-docs/src/main/paradox/persistence-query.md b/akka-docs/src/main/paradox/persistence-query.md index 07e734a6e8..4a0bf02096 100644 --- a/akka-docs/src/main/paradox/persistence-query.md +++ b/akka-docs/src/main/paradox/persistence-query.md @@ -18,7 +18,7 @@ Akka persistence query complements @ref:[Persistence](persistence.md) by providi query interface that various journal plugins can implement in order to expose their query capabilities. The most typical use case of persistence query is implementing the so-called query side (also known as "read side") -in the popular CQRS architecture pattern - in which the writing side of the application (e.g. implemented using akka +in the popular CQRS architecture pattern - in which the writing side of the application (e.g. implemented using Akka persistence) is completely separated from the "query side". Akka Persistence Query itself is *not* directly the query side of an application, however it can help to migrate data from the write side to the query side database. In very simple scenarios Persistence Query may be powerful enough to fulfill the query needs of your app, however we highly diff --git a/akka-docs/src/main/paradox/typed/index.md b/akka-docs/src/main/paradox/typed/index.md index dc11c445c2..00c8a626d3 100644 --- a/akka-docs/src/main/paradox/typed/index.md +++ b/akka-docs/src/main/paradox/typed/index.md @@ -6,6 +6,7 @@ * [actors](actors.md) * [dispatchers](dispatchers.md) +* [mailboxes](mailboxes.md) * [coexisting](coexisting.md) * [actor-lifecycle](actor-lifecycle.md) * [interaction patterns](interaction-patterns.md) diff --git a/akka-docs/src/main/paradox/typed/mailboxes.md b/akka-docs/src/main/paradox/typed/mailboxes.md new file mode 100644 index 0000000000..2b161c6085 --- /dev/null +++ b/akka-docs/src/main/paradox/typed/mailboxes.md @@ -0,0 +1,40 @@ +# Mailboxes + +## Dependency + +Mailboxes are part of core Akka, which means that they are part of the akka-actor-typed dependency: + +@@dependency[sbt,Maven,Gradle] { + group="com.typesafe.akka" + artifact="akka-actor-typed_$scala.binary_version$" + version="$akka.version$" +} + +## Introduction + +Each actor in Akka has a `Mailbox`, this is where the messages are enqueued before being processed by the actor. + +By default an unbounded mailbox is used, this means any number of messages can be enqueued into the mailbox. + +The unbounded mailbox is a convenient default but in a scenario where messages are added to the mailbox faster +than the actor can process them, this can lead to the application running out of memory. +For this reason a bounded mailbox can be specified, the bounded mailbox will pass new messages to `deadletters` when the +mailbox is full. + +For advanced use cases it is also possible to defer mailbox selection to config by pointing to a config path. + +## Selecting what mailbox is used + +To select mailbox for an actor use `MailboxSelector` to create a `Props` instance for spawning your actor: + +Scala +: @@snip [MailboxDocSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MailboxDocSpec.scala) { #select-mailbox } + +Java +: @@snip [MailboxDocTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/MailboxDocTest.java) { #select-mailbox } + +`fromConfig` takes an absolute config path to a block defining the dispatcher in the config file like this: + +@@snip [MailboxDocSpec.scala](/akka-actor-typed-tests/src/test/resources/mailbox-config-sample.conf) { } + +For more details on advanced mailbox config and custom mailbox implementations, see @ref[Classic Mailboxes](../mailboxes.md#builtin-mailbox-implementations).