make DNS actor work with RemoteActorRefProvider, #18650
* The configuration of the pool of the SimpleDnsManager is configured in deployment section "/IO-DNS/inet-address" * We don't really support deployment configuration of system actors but here it's used and I don't think we can change that. * It didn't work when using RemoteActorRefProvider/ClusterActorRefProvider, so I fixed it so that the behavior is consistent with the LocalActorRefProvider (verified by tests)
This commit is contained in:
parent
b9d1fa74ba
commit
fcd7561b2f
3 changed files with 101 additions and 45 deletions
|
|
@ -14,6 +14,11 @@ import akka.testkit.{ ImplicitSender, DefaultTimeout, AkkaSpec }
|
|||
import akka.pattern.gracefulStop
|
||||
import com.typesafe.config.Config
|
||||
import akka.actor.ActorSystem
|
||||
import akka.testkit.TestProbe
|
||||
import akka.testkit.TestProbe
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.testkit.TestActors.echoActorProps
|
||||
import akka.actor.ActorPath
|
||||
|
||||
object ConfiguredLocalRoutingSpec {
|
||||
val config = """
|
||||
|
|
@ -51,6 +56,10 @@ object ConfiguredLocalRoutingSpec {
|
|||
router = "akka.routing.ConfiguredLocalRoutingSpec$MyRouter"
|
||||
foo = bar
|
||||
}
|
||||
/sys-parent/round {
|
||||
router = round-robin-pool
|
||||
nr-of-instances = 6
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -81,6 +90,13 @@ object ConfiguredLocalRoutingSpec {
|
|||
def receive = { case _ ⇒ }
|
||||
}
|
||||
|
||||
class Parent extends Actor {
|
||||
def receive = {
|
||||
case (p: Props, name: String) ⇒
|
||||
sender() ! context.actorOf(p, name)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
|
|
@ -95,6 +111,15 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
|
|||
}
|
||||
}
|
||||
|
||||
def collectRouteePaths(probe: TestProbe, router: ActorRef, n: Int): immutable.Seq[ActorPath] = {
|
||||
for (i ← 1 to n) yield {
|
||||
val msg = i.toString
|
||||
router.tell(msg, probe.ref)
|
||||
probe.expectMsg(msg)
|
||||
probe.lastSender.path
|
||||
}
|
||||
}
|
||||
|
||||
"RouterConfig" must {
|
||||
|
||||
"be picked up from Props" in {
|
||||
|
|
@ -149,6 +174,19 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
|
|||
expectMsg("bar")
|
||||
}
|
||||
|
||||
"load settings from config for local child router of system actor" in {
|
||||
// we don't really support deployment configuration of system actors, but
|
||||
// it's used for the pool of the SimpleDnsManager "/IO-DNS/inet-address"
|
||||
val probe = TestProbe()
|
||||
val parent = system.asInstanceOf[ExtendedActorSystem].systemActorOf(Props[Parent], "sys-parent")
|
||||
parent.tell((FromConfig.props(echoActorProps), "round"), probe.ref)
|
||||
val router = probe.expectMsgType[ActorRef]
|
||||
val replies = collectRouteePaths(probe, router, 10)
|
||||
val children = replies.toSet
|
||||
children should have size 6
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -263,9 +263,9 @@ private[akka] class RemoteActorRefProvider(
|
|||
val lookup =
|
||||
if (lookupDeploy)
|
||||
elems.head match {
|
||||
case "user" ⇒ deployer.lookup(elems.drop(1))
|
||||
case "remote" ⇒ lookupRemotes(elems)
|
||||
case _ ⇒ None
|
||||
case "user" | "system" ⇒ deployer.lookup(elems.drop(1))
|
||||
case "remote" ⇒ lookupRemotes(elems)
|
||||
case _ ⇒ None
|
||||
}
|
||||
else None
|
||||
|
||||
|
|
|
|||
|
|
@ -3,16 +3,19 @@
|
|||
*/
|
||||
package akka.remote
|
||||
|
||||
import scala.collection.immutable
|
||||
import akka.testkit._
|
||||
import akka.routing._
|
||||
import akka.actor._
|
||||
import akka.remote.routing._
|
||||
import com.typesafe.config._
|
||||
import akka.testkit.TestActors.echoActorProps
|
||||
|
||||
object RemoteRouterSpec {
|
||||
class Echo extends Actor {
|
||||
class Parent extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒ sender() ! self
|
||||
case (p: Props, name: String) ⇒
|
||||
sender() ! context.actorOf(p, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -29,6 +32,14 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
router = round-robin-pool
|
||||
nr-of-instances = 4
|
||||
}
|
||||
/round {
|
||||
router = round-robin-pool
|
||||
nr-of-instances = 5
|
||||
}
|
||||
/sys-parent/round {
|
||||
router = round-robin-pool
|
||||
nr-of-instances = 6
|
||||
}
|
||||
}""") {
|
||||
|
||||
import RemoteRouterSpec._
|
||||
|
|
@ -76,15 +87,21 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
shutdown(masterSystem)
|
||||
}
|
||||
|
||||
def collectRouteePaths(probe: TestProbe, router: ActorRef, n: Int): immutable.Seq[ActorPath] = {
|
||||
for (i ← 1 to n) yield {
|
||||
val msg = i.toString
|
||||
router.tell(msg, probe.ref)
|
||||
probe.expectMsg(msg)
|
||||
probe.lastSender.path
|
||||
}
|
||||
}
|
||||
|
||||
"A Remote Router" must {
|
||||
|
||||
"deploy its children on remote host driven by configuration" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(Props[Echo]), "blub")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router.tell("", probe.ref)
|
||||
probe.expectMsgType[ActorRef].path
|
||||
}
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps), "blub")
|
||||
val replies = collectRouteePaths(probe, router, 5)
|
||||
val children = replies.toSet
|
||||
children should have size 2
|
||||
children.map(_.parent) should have size 1
|
||||
|
|
@ -95,11 +112,8 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
"deploy its children on remote host driven by programatic definition" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(new RemoteRouterConfig(RoundRobinPool(2),
|
||||
Seq(Address("akka.tcp", sysName, "localhost", port))).props(Props[Echo]), "blub2")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router.tell("", probe.ref)
|
||||
probe.expectMsgType[ActorRef].path
|
||||
}
|
||||
Seq(Address("akka.tcp", sysName, "localhost", port))).props(echoActorProps), "blub2")
|
||||
val replies = collectRouteePaths(probe, router, 5)
|
||||
val children = replies.toSet
|
||||
children should have size 2
|
||||
children.map(_.parent) should have size 1
|
||||
|
|
@ -109,11 +123,8 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
|
||||
"deploy dynamic resizable number of children on remote host driven by configuration" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(FromConfig.props(Props[Echo]), "elastic-blub")
|
||||
val replies = for (i ← 1 to 5000) yield {
|
||||
router.tell("", probe.ref)
|
||||
probe.expectMsgType[ActorRef].path
|
||||
}
|
||||
val router = masterSystem.actorOf(FromConfig.props(echoActorProps), "elastic-blub")
|
||||
val replies = collectRouteePaths(probe, router, 5000)
|
||||
val children = replies.toSet
|
||||
children.size should be >= 2
|
||||
children.map(_.parent) should have size 1
|
||||
|
|
@ -123,12 +134,9 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
|
||||
"deploy remote routers based on configuration" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(FromConfig.props(Props[Echo]), "remote-blub")
|
||||
val router = masterSystem.actorOf(FromConfig.props(echoActorProps), "remote-blub")
|
||||
router.path.address.toString should ===(s"akka.tcp://${sysName}@localhost:${port}")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router.tell("", probe.ref)
|
||||
probe.expectMsgType[ActorRef].path
|
||||
}
|
||||
val replies = collectRouteePaths(probe, router, 5)
|
||||
val children = replies.toSet
|
||||
children should have size 2
|
||||
val parents = children.map(_.parent)
|
||||
|
|
@ -140,13 +148,10 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
|
||||
"deploy remote routers based on explicit deployment" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(Props[Echo])
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps)
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"akka.tcp://${sysName}@localhost:${port}")))), "remote-blub2")
|
||||
router.path.address.toString should ===(s"akka.tcp://${sysName}@localhost:${port}")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router.tell("", probe.ref)
|
||||
probe.expectMsgType[ActorRef].path
|
||||
}
|
||||
val replies = collectRouteePaths(probe, router, 5)
|
||||
val children = replies.toSet
|
||||
children should have size 2
|
||||
val parents = children.map(_.parent)
|
||||
|
|
@ -158,13 +163,10 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
|
||||
"let remote deployment be overridden by local configuration" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(Props[Echo])
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps)
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"akka.tcp://${sysName}@localhost:${port}")))), "local-blub")
|
||||
router.path.address.toString should ===("akka://MasterRemoteRouterSpec")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router.tell("", probe.ref)
|
||||
probe.expectMsgType[ActorRef].path
|
||||
}
|
||||
val replies = collectRouteePaths(probe, router, 5)
|
||||
val children = replies.toSet
|
||||
children should have size 2
|
||||
val parents = children.map(_.parent)
|
||||
|
|
@ -176,13 +178,10 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
|
||||
"let remote deployment router be overridden by local configuration" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(Props[Echo])
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps)
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"akka.tcp://${sysName}@localhost:${port}")))), "local-blub2")
|
||||
router.path.address.toString should ===(s"akka.tcp://${sysName}@localhost:${port}")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router.tell("", probe.ref)
|
||||
probe.expectMsgType[ActorRef].path
|
||||
}
|
||||
val replies = collectRouteePaths(probe, router, 5)
|
||||
val children = replies.toSet
|
||||
children should have size 4
|
||||
val parents = children.map(_.parent)
|
||||
|
|
@ -194,13 +193,10 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
|
||||
"let remote deployment be overridden by remote configuration" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(Props[Echo])
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps)
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"akka.tcp://${sysName}@localhost:${port}")))), "remote-override")
|
||||
router.path.address.toString should ===(s"akka.tcp://${sysName}@localhost:${port}")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router.tell("", probe.ref)
|
||||
probe.expectMsgType[ActorRef].path
|
||||
}
|
||||
val replies = collectRouteePaths(probe, router, 5)
|
||||
val children = replies.toSet
|
||||
children should have size 4
|
||||
val parents = children.map(_.parent)
|
||||
|
|
@ -226,6 +222,28 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
probe.expectMsgType[ActorKilledException]
|
||||
}
|
||||
|
||||
"load settings from config for local router" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(FromConfig.props(echoActorProps), "round")
|
||||
val replies = collectRouteePaths(probe, router, 10)
|
||||
val children = replies.toSet
|
||||
children should have size 5
|
||||
masterSystem.stop(router)
|
||||
}
|
||||
|
||||
"load settings from config for local child router of system actor" in {
|
||||
// we don't really support deployment configuration of system actors, but
|
||||
// it's used for the pool of the SimpleDnsManager "/IO-DNS/inet-address"
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val parent = masterSystem.asInstanceOf[ExtendedActorSystem].systemActorOf(Props[Parent], "sys-parent")
|
||||
parent.tell((FromConfig.props(echoActorProps), "round"), probe.ref)
|
||||
val router = probe.expectMsgType[ActorRef]
|
||||
val replies = collectRouteePaths(probe, router, 10)
|
||||
val children = replies.toSet
|
||||
children should have size 6
|
||||
masterSystem.stop(router)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue