re-enable the missing three multi-jvm tests
- mostly a matter of putting correct target.nodes URIs into configs and adding withRouting to Props - ScatterGather test was wrong to assume equal distribution of replies from connections; it is random and only the sum should add up
This commit is contained in:
parent
d1a26a9e28
commit
db7dd9450c
19 changed files with 59 additions and 55 deletions
|
|
@ -223,7 +223,7 @@ private[akka] class ActorCell(
|
|||
final var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
|
||||
|
||||
private def _actorOf(props: Props, name: String): ActorRef = {
|
||||
val actor = provider.actorOf(systemImpl, props, self, self.path / name, false)
|
||||
val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None)
|
||||
childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor))
|
||||
actor
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ trait ActorRefProvider {
|
|||
* in case of remote supervision). If systemService is true, deployment is
|
||||
* bypassed (local-only).
|
||||
*/
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy] = None): InternalActorRef
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef
|
||||
|
||||
/**
|
||||
* Create actor reference for a specified local or remote path. If no such
|
||||
|
|
@ -481,10 +481,10 @@ class LocalActorRefProvider(
|
|||
}
|
||||
|
||||
lazy val guardian: InternalActorRef =
|
||||
actorOf(system, guardianProps, rootGuardian, rootPath / "user", true)
|
||||
actorOf(system, guardianProps, rootGuardian, rootPath / "user", true, None)
|
||||
|
||||
lazy val systemGuardian: InternalActorRef =
|
||||
actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true)
|
||||
actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None)
|
||||
|
||||
lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log)
|
||||
|
||||
|
|
@ -518,7 +518,7 @@ class LocalActorRefProvider(
|
|||
case x ⇒ x
|
||||
}
|
||||
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy] = None): InternalActorRef = {
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = {
|
||||
props.routerConfig match {
|
||||
case NoRouter ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor
|
||||
case router ⇒
|
||||
|
|
|
|||
|
|
@ -201,7 +201,7 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa
|
|||
val subpath = elems.drop(1)
|
||||
val path = remote.remoteDaemon.path / subpath
|
||||
val supervisor = system.actorFor(message.getSupervisor).asInstanceOf[InternalActorRef]
|
||||
val actor = system.provider.actorOf(system, Props(creator = actorFactory), supervisor, path, true)
|
||||
val actor = system.provider.actorOf(system, Props(creator = actorFactory), supervisor, path, true, None)
|
||||
addChild(subpath.mkString("/"), actor)
|
||||
system.deathWatch.subscribe(this, actor)
|
||||
case _ ⇒
|
||||
|
|
|
|||
|
|
@ -57,8 +57,8 @@ class RemoteActorRefProvider(
|
|||
terminationFuture.onComplete(_ ⇒ remote.server.shutdown())
|
||||
}
|
||||
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy] = None): InternalActorRef =
|
||||
if (systemService) local.actorOf(system, props, supervisor, path, systemService)
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = {
|
||||
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy)
|
||||
else {
|
||||
|
||||
/*
|
||||
|
|
@ -105,20 +105,23 @@ class RemoteActorRefProvider(
|
|||
|
||||
deployment match {
|
||||
case Some(Deploy(_, _, _, _, RemoteScope(address))) ⇒
|
||||
if (address == rootPath.address) local.actorOf(system, props, supervisor, path, false)
|
||||
else address.parse(remote.transports) match {
|
||||
// FIXME RK this should be done within the deployer, i.e. the whole parsing business
|
||||
address.parse(remote.transports) match {
|
||||
case Left(x) ⇒
|
||||
// FIXME RK this should be done within the deployer, i.e. the whole parsing business
|
||||
throw new ConfigurationException("cannot parse remote address: " + x)
|
||||
case Right(addr) ⇒
|
||||
val rpath = RootActorPath(addr) / "remote" / rootPath.address.hostPort / path.elements
|
||||
useActorOnNode(rpath, props.creator, supervisor)
|
||||
new RemoteActorRef(this, remote.server, rpath, supervisor, None)
|
||||
if (addr == rootPath.address) local.actorOf(system, props, supervisor, path, false, deployment)
|
||||
else {
|
||||
val rpath = RootActorPath(addr) / "remote" / rootPath.address.hostPort / path.elements
|
||||
useActorOnNode(rpath, props.creator, supervisor)
|
||||
new RemoteActorRef(this, remote.server, rpath, supervisor, None)
|
||||
}
|
||||
}
|
||||
|
||||
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService)
|
||||
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def actorFor(path: ActorPath): InternalActorRef = path.root match {
|
||||
case `rootPath` ⇒ actorFor(rootGuardian, path.elements)
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "random"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,6 @@ akka {
|
|||
provider = "akka.remote.RemoteActorRefProvider"
|
||||
deployment./service-hello.router = "random"
|
||||
deployment./service-hello.nr-of-instances = 3
|
||||
deployment./service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
deployment./service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "random"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "random"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
package akka.remote.random_routed
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.{ Actor, Props }
|
||||
import akka.remote._
|
||||
import akka.routing._
|
||||
import akka.testkit.DefaultTimeout
|
||||
|
|
@ -19,7 +19,7 @@ class RandomRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec {
|
|||
import RandomRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"___" must {
|
||||
"___" ignore {
|
||||
"___" in {
|
||||
barrier("start")
|
||||
barrier("broadcast-end")
|
||||
barrier("end")
|
||||
|
|
@ -32,7 +32,7 @@ class RandomRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec {
|
|||
import RandomRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"___" must {
|
||||
"___" ignore {
|
||||
"___" in {
|
||||
barrier("start")
|
||||
barrier("broadcast-end")
|
||||
barrier("end")
|
||||
|
|
@ -45,7 +45,7 @@ class RandomRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec {
|
|||
import RandomRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"___" must {
|
||||
"___" ignore {
|
||||
"___" in {
|
||||
barrier("start")
|
||||
barrier("broadcast-end")
|
||||
barrier("end")
|
||||
|
|
@ -58,10 +58,10 @@ class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with DefaultTi
|
|||
import RandomRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"A new remote actor configured with a Random router" must {
|
||||
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" ignore {
|
||||
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
|
||||
|
||||
barrier("start")
|
||||
val actor = system.actorOf[SomeActor]("service-hello")
|
||||
val actor = system.actorOf(Props[SomeActor].withRouting(RoundRobinRouter()), "service-hello")
|
||||
actor.isInstanceOf[RoutedActorRef] must be(true)
|
||||
|
||||
val connectionCount = NrOfNodes - 1
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "round-robin"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "round-robin"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "round-robin"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "round-robin"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
package akka.remote.round_robin_routed
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.{ Actor, Props }
|
||||
import akka.remote._
|
||||
import akka.routing._
|
||||
import akka.testkit.DefaultTimeout
|
||||
|
|
@ -19,7 +19,7 @@ class RoundRobinRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec {
|
|||
import RoundRobinRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"___" must {
|
||||
"___" ignore {
|
||||
"___" in {
|
||||
barrier("start")
|
||||
barrier("broadcast-end")
|
||||
barrier("end")
|
||||
|
|
@ -32,7 +32,7 @@ class RoundRobinRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec {
|
|||
import RoundRobinRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"___" must {
|
||||
"___" ignore {
|
||||
"___" in {
|
||||
barrier("start")
|
||||
barrier("broadcast-end")
|
||||
barrier("end")
|
||||
|
|
@ -45,7 +45,7 @@ class RoundRobinRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec {
|
|||
import RoundRobinRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"___" must {
|
||||
"___" ignore {
|
||||
"___" in {
|
||||
barrier("start")
|
||||
barrier("broadcast-end")
|
||||
barrier("end")
|
||||
|
|
@ -58,10 +58,10 @@ class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with Defau
|
|||
import RoundRobinRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"A new remote actor configured with a RoundRobin router" must {
|
||||
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" ignore {
|
||||
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
|
||||
|
||||
barrier("start")
|
||||
val actor = system.actorOf[SomeActor]("service-hello")
|
||||
val actor = system.actorOf(Props[SomeActor].withRouting(RoundRobinRouter()), "service-hello")
|
||||
actor.isInstanceOf[RoutedActorRef] must be(true)
|
||||
|
||||
val connectionCount = NrOfNodes - 1
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "scatter-gather"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "scatter-gather"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "scatter-gather"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ akka {
|
|||
deployment {
|
||||
/service-hello.router = "scatter-gather"
|
||||
/service-hello.nr-of-instances = 3
|
||||
/service-hello.target.hosts = ["localhost:9991","localhost:9992","localhost:9993"]
|
||||
/service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
package akka.remote.scatter_gather_routed
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.{ Actor, Props }
|
||||
import akka.remote._
|
||||
import akka.routing._
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object ScatterGatherRoutedRemoteActorMultiJvmSpec {
|
||||
val NrOfNodes = 4
|
||||
|
|
@ -19,7 +20,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec {
|
|||
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"___" must {
|
||||
"___" ignore {
|
||||
"___" in {
|
||||
barrier("start")
|
||||
barrier("broadcast-end")
|
||||
barrier("end")
|
||||
|
|
@ -32,7 +33,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec {
|
|||
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"___" must {
|
||||
"___" ignore {
|
||||
"___" in {
|
||||
barrier("start")
|
||||
barrier("broadcast-end")
|
||||
barrier("end")
|
||||
|
|
@ -45,7 +46,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec {
|
|||
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"___" must {
|
||||
"___" ignore {
|
||||
"___" in {
|
||||
barrier("start")
|
||||
barrier("broadcast-end")
|
||||
barrier("end")
|
||||
|
|
@ -54,37 +55,37 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec {
|
|||
}
|
||||
}
|
||||
|
||||
class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with DefaultTimeout {
|
||||
class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with DefaultTimeout with ImplicitSender {
|
||||
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
|
||||
val nodes = NrOfNodes
|
||||
"A new remote actor configured with a ScatterGather router" must {
|
||||
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" ignore {
|
||||
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
|
||||
|
||||
barrier("start")
|
||||
val actor = system.actorOf[SomeActor]("service-hello")
|
||||
val actor = system.actorOf(Props[SomeActor].withRouting(RoundRobinRouter()), "service-hello")
|
||||
actor.isInstanceOf[RoutedActorRef] must be(true)
|
||||
//actor.asInstanceOf[RoutedActorRef].router.isInstanceOf[ScatterGatherFirstCompletedRouter] must be(true)
|
||||
|
||||
val connectionCount = NrOfNodes - 1
|
||||
val iterationCount = 10
|
||||
|
||||
var replies = Map(
|
||||
"node1" -> 0,
|
||||
"node2" -> 0,
|
||||
"node3" -> 0)
|
||||
|
||||
for (i ← 0 until iterationCount) {
|
||||
for (k ← 0 until connectionCount) {
|
||||
val nodeName = (actor ? "hit").as[String].getOrElse(fail("No id returned by actor"))
|
||||
replies = replies + (nodeName -> (replies(nodeName) + 1))
|
||||
actor ! "hit"
|
||||
}
|
||||
}
|
||||
|
||||
val replies = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
|
||||
case name: String ⇒ (name, 1)
|
||||
}).foldLeft(Map("node1" -> 0, "node2" -> 0, "node3" -> 0)) {
|
||||
case (m, (n, c)) ⇒ m + (n -> (m(n) + c))
|
||||
}
|
||||
|
||||
barrier("broadcast-end")
|
||||
actor ! Broadcast("end")
|
||||
|
||||
barrier("end")
|
||||
replies.values foreach { _ must be(10) }
|
||||
replies.values.sum must be === connectionCount * iterationCount
|
||||
|
||||
barrier("done")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue