additional mailbox selector for typed props (#1096)
* additional mailbox selector for typed props * add unit test * chore change of unit test * Revert "configuration typo" This reverts commit 7917feb32a4b042124513d98068db832cc59a541. * fix pekko imports * mention interoperability in doc * share configuration in tests * revert configuration change * fix new typo * fix jdocs tests * optimized import * mention api version in doc * resolve import issue
This commit is contained in:
parent
db8d20ddea
commit
d1ec224330
10 changed files with 283 additions and 18 deletions
|
|
@ -29,4 +29,27 @@ public class DispatchersDocTest {
|
|||
behavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"));
|
||||
// #spawn-dispatcher
|
||||
}
|
||||
|
||||
public static void spawnDispatchersWithInteroperability(
|
||||
ActorContext<Integer> context, Behavior<String> behavior) {
|
||||
// #interoperability-with-mailbox
|
||||
context.spawn(
|
||||
behavior,
|
||||
"ExplicitDefaultDispatcher",
|
||||
DispatcherSelector.defaultDispatcher().withMailboxFromConfig("my-app.my-special-mailbox"));
|
||||
context.spawn(
|
||||
behavior,
|
||||
"BlockingDispatcher",
|
||||
DispatcherSelector.blocking().withMailboxFromConfig("my-app.my-special-mailbox"));
|
||||
context.spawn(
|
||||
behavior,
|
||||
"ParentDispatcher",
|
||||
DispatcherSelector.sameAsParent().withMailboxFromConfig("my-app.my-special-mailbox"));
|
||||
context.spawn(
|
||||
behavior,
|
||||
"DispatcherFromConfig",
|
||||
DispatcherSelector.fromConfig("your-dispatcher")
|
||||
.withMailboxFromConfig("my-app.my-special-mailbox"));
|
||||
// #interoperability-with-mailbox
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
|
|||
import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe;
|
||||
import org.apache.pekko.actor.typed.ActorRef;
|
||||
import org.apache.pekko.actor.typed.Behavior;
|
||||
import org.apache.pekko.actor.typed.Dispatchers;
|
||||
import org.apache.pekko.actor.typed.MailboxSelector;
|
||||
import org.apache.pekko.actor.typed.javadsl.Behaviors;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
|
@ -59,4 +60,33 @@ public class MailboxDocTest extends JUnitSuite {
|
|||
ActorRef<Void> ref = testKit.spawn(setup);
|
||||
testProbe.receiveMessage();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void startSomeActorsWithMailboxSelectorInteroperability() {
|
||||
TestProbe<Done> testProbe = testKit.createTestProbe();
|
||||
Behavior<String> childBehavior = Behaviors.empty();
|
||||
|
||||
Behavior<Void> setup =
|
||||
Behaviors.setup(
|
||||
context -> {
|
||||
// #interoperability-with-dispatcher
|
||||
context.spawn(
|
||||
childBehavior,
|
||||
"bounded-mailbox-child",
|
||||
MailboxSelector.bounded(100).withDispatcherDefault());
|
||||
|
||||
context.spawn(
|
||||
childBehavior,
|
||||
"from-config-mailbox-child",
|
||||
MailboxSelector.fromConfig("my-app.my-special-mailbox")
|
||||
.withDispatcherFromConfig(Dispatchers.DefaultDispatcherId()));
|
||||
// #interoperability-with-dispatcher
|
||||
|
||||
testProbe.ref().tell(Done.getInstance());
|
||||
return Behaviors.stopped();
|
||||
});
|
||||
|
||||
ActorRef<Void> ref = testKit.spawn(setup);
|
||||
testProbe.receiveMessage();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,6 +40,9 @@ object DispatchersDocSpec {
|
|||
throughput = 1
|
||||
}
|
||||
//#config
|
||||
your-mailbox {
|
||||
mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
||||
}
|
||||
""".stripMargin)
|
||||
|
||||
case class WhichDispatcher(replyTo: ActorRef[Dispatcher])
|
||||
|
|
@ -65,6 +68,24 @@ object DispatchersDocSpec {
|
|||
Behaviors.same
|
||||
}
|
||||
|
||||
val interoperableExample = Behaviors.receive[Any] { (context, _) =>
|
||||
// #interoperability-with-mailbox
|
||||
import org.apache.pekko.actor.typed.DispatcherSelector
|
||||
|
||||
context.spawn(yourBehavior, "DefaultDispatcher")
|
||||
context.spawn(yourBehavior, "ExplicitDefaultDispatcher",
|
||||
DispatcherSelector.default().withMailboxFromConfig("your-mailbox"))
|
||||
context.spawn(yourBehavior, "BlockingDispatcher",
|
||||
DispatcherSelector.blocking().withMailboxFromConfig("your-mailbox"))
|
||||
context.spawn(yourBehavior, "ParentDispatcher",
|
||||
DispatcherSelector.sameAsParent().withMailboxFromConfig("your-mailbox"))
|
||||
context.spawn(yourBehavior, "DispatcherFromConfig",
|
||||
DispatcherSelector.fromConfig("your-dispatcher").withMailboxFromConfig("your-mailbox"))
|
||||
// #interoperability-with-mailbox
|
||||
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class DispatchersDocSpec
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import pekko.Done
|
|||
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import pekko.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import pekko.actor.typed.Behavior
|
||||
import pekko.actor.typed.Dispatchers
|
||||
import pekko.actor.typed.MailboxSelector
|
||||
import pekko.actor.typed.scaladsl.Behaviors
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
|
@ -47,6 +48,29 @@ class MailboxDocSpec
|
|||
|
||||
probe.receiveMessage()
|
||||
}
|
||||
|
||||
"interoperability with DispatcherSelector" in {
|
||||
|
||||
val probe = createTestProbe[Done]()
|
||||
val childBehavior: Behavior[String] = Behaviors.empty
|
||||
val parent: Behavior[Unit] = Behaviors.setup { context =>
|
||||
// #interoperability-with-dispatcher
|
||||
context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100).withDispatcherDefault)
|
||||
|
||||
val props =
|
||||
MailboxSelector.fromConfig("my-app.my-special-mailbox").withDispatcherFromConfig(
|
||||
Dispatchers.DefaultDispatcherId)
|
||||
context.spawn(childBehavior, "from-config-mailbox-child", props)
|
||||
// #interoperability-with-dispatcher
|
||||
|
||||
probe.ref ! Done
|
||||
Behaviors.stopped
|
||||
}
|
||||
spawn(parent)
|
||||
|
||||
probe.receiveMessage()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,9 @@ import pekko.actor.testkit.typed.scaladsl.TestProbe
|
|||
import pekko.actor.typed.ActorRef
|
||||
import pekko.actor.typed.ActorSystem
|
||||
import pekko.actor.typed.Behavior
|
||||
import pekko.actor.typed.DispatcherSelector
|
||||
import pekko.actor.typed.Props
|
||||
import pekko.actor.typed.scaladsl.AskPattern._
|
||||
import pekko.actor.typed.SpawnProtocol
|
||||
|
||||
object DispatcherSelectorSpec {
|
||||
|
|
@ -64,7 +66,7 @@ class DispatcherSelectorSpec(config: Config)
|
|||
|
||||
"DispatcherSelector" must {
|
||||
|
||||
"select dispatcher from config" in {
|
||||
"select dispatcher from empty Props" in {
|
||||
val probe = createTestProbe[Pong]()
|
||||
val pingPong = spawn(PingPong(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
|
||||
pingPong ! Ping(probe.ref)
|
||||
|
|
@ -73,18 +75,27 @@ class DispatcherSelectorSpec(config: Config)
|
|||
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
|
||||
}
|
||||
|
||||
"select dispatcher from DispatcherSelector" in {
|
||||
val probe = createTestProbe[Pong]()
|
||||
val pingPong = spawn(PingPong(), DispatcherSelector.fromConfig("ping-pong-dispatcher"))
|
||||
pingPong ! Ping(probe.ref)
|
||||
|
||||
val response = probe.receiveMessage()
|
||||
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
|
||||
}
|
||||
|
||||
"detect unknown dispatcher from config" in {
|
||||
val probe = createTestProbe[Pong]()
|
||||
LoggingTestKit.error("Spawn failed").expect {
|
||||
val ref = spawn(PingPong(), Props.empty.withDispatcherFromConfig("unknown"))
|
||||
val ref = spawn(PingPong(), DispatcherSelector.fromConfig("unknown"))
|
||||
probe.expectTerminated(ref)
|
||||
}
|
||||
}
|
||||
|
||||
"select same dispatcher as parent" in {
|
||||
val parent = spawn(SpawnProtocol(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
|
||||
val parent = spawn(SpawnProtocol(), DispatcherSelector.fromConfig("ping-pong-dispatcher"))
|
||||
val childProbe = createTestProbe[ActorRef[Ping]]()
|
||||
parent ! SpawnProtocol.Spawn(PingPong(), "child", Props.empty.withDispatcherSameAsParent, childProbe.ref)
|
||||
parent ! SpawnProtocol.Spawn(PingPong(), "child", DispatcherSelector.sameAsParent(), childProbe.ref)
|
||||
|
||||
val probe = createTestProbe[Pong]()
|
||||
val child = childProbe.receiveMessage()
|
||||
|
|
@ -95,19 +106,13 @@ class DispatcherSelectorSpec(config: Config)
|
|||
}
|
||||
|
||||
"select same dispatcher as parent, several levels" in {
|
||||
val grandParent = spawn(SpawnProtocol(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
|
||||
val parentProbe = createTestProbe[ActorRef[SpawnProtocol.Spawn[Ping]]]()
|
||||
grandParent ! SpawnProtocol.Spawn(
|
||||
SpawnProtocol(),
|
||||
"parent",
|
||||
Props.empty.withDispatcherSameAsParent,
|
||||
parentProbe.ref)
|
||||
|
||||
val childProbe = createTestProbe[ActorRef[Ping]]()
|
||||
grandParent ! SpawnProtocol.Spawn(PingPong(), "child", Props.empty.withDispatcherSameAsParent, childProbe.ref)
|
||||
val guardian = spawn(SpawnProtocol(), DispatcherSelector.fromConfig("ping-pong-dispatcher"))
|
||||
val parent: ActorRef[SpawnProtocol.Command] = guardian.ask((replyTo: ActorRef[ActorRef[SpawnProtocol.Command]]) =>
|
||||
SpawnProtocol.Spawn(SpawnProtocol(), "parent", DispatcherSelector.sameAsParent(), replyTo)).futureValue
|
||||
val child: ActorRef[Ping] = parent.ask((reply: ActorRef[ActorRef[Ping]]) =>
|
||||
SpawnProtocol.Spawn(PingPong(), "child", DispatcherSelector.sameAsParent(), reply)).futureValue
|
||||
|
||||
val probe = createTestProbe[Pong]()
|
||||
val child = childProbe.receiveMessage()
|
||||
child ! Ping(probe.ref)
|
||||
|
||||
val response = probe.receiveMessage()
|
||||
|
|
@ -119,7 +124,7 @@ class DispatcherSelectorSpec(config: Config)
|
|||
PingPong(),
|
||||
"DispatcherSelectorSpec2",
|
||||
ActorSystemSetup.create(BootstrapSetup()),
|
||||
Props.empty.withDispatcherSameAsParent)
|
||||
DispatcherSelector.sameAsParent())
|
||||
try {
|
||||
val probe = TestProbe[Pong]()(sys)
|
||||
sys ! Ping(probe.ref)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,132 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.pekko.actor.typed.scaladsl
|
||||
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
import org.apache.pekko
|
||||
import pekko.actor.ActorCell
|
||||
import pekko.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import pekko.actor.typed.ActorRef
|
||||
import pekko.actor.typed.Behavior
|
||||
import pekko.actor.typed.DispatcherSelector
|
||||
import pekko.actor.typed.MailboxSelector
|
||||
import pekko.actor.typed.Props
|
||||
import pekko.actor.typed.internal.adapter.ActorContextAdapter
|
||||
import pekko.actor.typed.scaladsl.AskPattern._
|
||||
import pekko.dispatch.BoundedMessageQueueSemantics
|
||||
import pekko.dispatch.BoundedNodeMessageQueue
|
||||
import pekko.dispatch.Dispatchers
|
||||
import pekko.dispatch.MessageQueue
|
||||
import pekko.dispatch.NodeMessageQueue
|
||||
|
||||
object MailboxSelectorSpec {
|
||||
val config = ConfigFactory.parseString(
|
||||
"""
|
||||
specific-mailbox {
|
||||
mailbox-type = "org.apache.pekko.dispatch.NonBlockingBoundedMailbox"
|
||||
mailbox-capacity = 4
|
||||
}
|
||||
""")
|
||||
|
||||
object PingPong {
|
||||
case class Ping(replyTo: ActorRef[Pong])
|
||||
|
||||
case class Pong(threadName: String)
|
||||
|
||||
def apply(): Behavior[Ping] =
|
||||
Behaviors.receiveMessage[Ping] { message =>
|
||||
message.replyTo ! Pong(Thread.currentThread().getName)
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class MailboxSelectorSpec(config: Config)
|
||||
extends ScalaTestWithActorTestKit(config)
|
||||
with AnyWordSpecLike
|
||||
with LogCapturing {
|
||||
|
||||
def this() = this(MailboxSelectorSpec.config)
|
||||
|
||||
sealed trait Command
|
||||
case class WhatsYourMailbox(replyTo: ActorRef[MessageQueue]) extends Command
|
||||
case class WhatsYourDispatcher(replyTo: ActorRef[String]) extends Command
|
||||
|
||||
private def extract[R](context: ActorContext[_], f: ActorCell => R): R = {
|
||||
context match {
|
||||
case adapter: ActorContextAdapter[_] =>
|
||||
adapter.classicActorContext match {
|
||||
case cell: ActorCell => f(cell)
|
||||
case unexpected => throw new RuntimeException(s"Unexpected: $unexpected")
|
||||
}
|
||||
case unexpected => throw new RuntimeException(s"Unexpected: $unexpected")
|
||||
}
|
||||
}
|
||||
|
||||
private def behavior: Behavior[Command] =
|
||||
Behaviors.setup { context =>
|
||||
Behaviors.receiveMessage[Command] {
|
||||
case WhatsYourMailbox(replyTo) =>
|
||||
replyTo ! extract(context, cell => cell.mailbox.messageQueue)
|
||||
Behaviors.same
|
||||
case WhatsYourDispatcher(replyTo) =>
|
||||
replyTo ! extract(context, cell => cell.dispatcher.id)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
"MailboxSelectorSpec" must {
|
||||
|
||||
"default is unbounded" in {
|
||||
val actor = spawn(behavior)
|
||||
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
|
||||
mailbox shouldBe a[NodeMessageQueue]
|
||||
}
|
||||
|
||||
"select an specific mailbox from MailboxSelector " in {
|
||||
val actor = spawn(behavior, MailboxSelector.fromConfig("specific-mailbox"))
|
||||
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
|
||||
mailbox shouldBe a[BoundedMessageQueueSemantics]
|
||||
mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4)
|
||||
}
|
||||
|
||||
"select an specific mailbox from empty Props " in {
|
||||
val actor = spawn(behavior, Props.empty.withMailboxFromConfig("specific-mailbox"))
|
||||
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
|
||||
mailbox shouldBe a[BoundedMessageQueueSemantics]
|
||||
mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4)
|
||||
}
|
||||
|
||||
"select an specific mailbox from DispatcherSelector " in {
|
||||
val actor = spawn(behavior, DispatcherSelector.blocking().withMailboxFromConfig("specific-mailbox"))
|
||||
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
|
||||
mailbox shouldBe a[BoundedMessageQueueSemantics]
|
||||
mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4)
|
||||
val dispatcher = actor.ask(WhatsYourDispatcher(_)).futureValue
|
||||
dispatcher shouldBe Dispatchers.DefaultBlockingDispatcherId
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -81,6 +81,14 @@ abstract class Props private[pekko] () extends Product with Serializable {
|
|||
*/
|
||||
def withDispatcherFromConfig(path: String): Props = DispatcherFromConfig(path, this)
|
||||
|
||||
/**
|
||||
* Prepend a selection of the mailbox found at the given Config path to this Props.
|
||||
* The path is relative to the configuration root of the [[ActorSystem]] that looks up the
|
||||
* mailbox.
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def withMailboxFromConfig(path: String): Props = MailboxFromConfigSelector(path, this)
|
||||
|
||||
/**
|
||||
* Prepend a selection of the same executor as the parent actor to this Props.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -324,7 +324,7 @@ pekko {
|
|||
chance-of-exploration = 0.4
|
||||
|
||||
# When downsizing after a long streak of under-utilization, the resizer
|
||||
# will downsize the pool to the highest utilization multiplied by a
|
||||
# will downsize the pool to the highest utilization multiplied by
|
||||
# a downsize ratio. This downsize ratio determines the new pools size
|
||||
# in comparison to the highest utilization.
|
||||
# E.g. if the highest utilization is 10, and the down size ratio
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ A default dispatcher is used for all actors that are spawned without specifying
|
|||
This is suitable for all actors that don't block. Blocking in actors needs to be carefully managed, more
|
||||
details @ref:[here](#blocking-needs-careful-management).
|
||||
|
||||
To select a dispatcher use `DispatcherSelector` to create a `Props` instance for spawning your actor:
|
||||
To select a dispatcher use @apidoc[DispatcherSelector](DispatcherSelector$) to create a @apidoc[Props](typed.Props) instance for spawning your actor:
|
||||
|
||||
Scala
|
||||
: @@snip [DispatcherDocSpec.scala](/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/DispatchersDocSpec.scala) { #spawn-dispatcher }
|
||||
|
|
@ -74,6 +74,17 @@ The final example shows how to load a custom dispatcher from configuration and r
|
|||
<!-- Same between Java and Scala -->
|
||||
@@snip [DispatcherDocSpec.scala](/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/DispatchersDocSpec.scala) { #config }
|
||||
|
||||
### Interoperability with MailboxSelector
|
||||
|
||||
The @apidoc[DispatcherSelector](DispatcherSelector$) will create a @apidoc[Props](typed.Props) instance that can be both set up Dispatcher and Mailbox,
|
||||
which means that you can continue to set up Mailbox through chain calls.
|
||||
|
||||
Scala
|
||||
: @@snip [DispatcherDocSpec.scala](/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/DispatchersDocSpec.scala) { #interoperability-with-mailbox }
|
||||
|
||||
Java
|
||||
: @@snip [DispatcherDocTest.java](/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/DispatchersDocTest.java) { #interoperability-with-mailbox }
|
||||
|
||||
## Types of dispatchers
|
||||
|
||||
There are 2 different types of message dispatchers:
|
||||
|
|
|
|||
|
|
@ -58,6 +58,17 @@ configuration section from the @apidoc[ActorSystem](typed.ActorSystem) configura
|
|||
`id` key with the configuration path of the mailbox type and adding a
|
||||
fall-back to the default mailbox configuration section.
|
||||
|
||||
### Interoperability with DispatcherSelector
|
||||
|
||||
The @apidoc[MailboxSelector](MailboxSelector$) will create a @apidoc[Props](typed.Props) instance that can be both set up Dispatcher and Mailbox,
|
||||
which means that you can continue to set up Dispatcher through chain calls.
|
||||
|
||||
Scala
|
||||
: @@snip [MailboxDocSpec.scala](/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/MailboxDocSpec.scala) { #interoperability-with-dispatcher }
|
||||
|
||||
Java
|
||||
: @@snip [MailboxDocTest.java](/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/MailboxDocTest.java) { #interoperability-with-dispatcher }
|
||||
|
||||
## Mailbox Implementations
|
||||
|
||||
Pekko ships with a number of mailbox implementations:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue