diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala new file mode 100755 index 0000000000..9e2960465e --- /dev/null +++ b/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala @@ -0,0 +1,23 @@ +package akka.remote + +import com.typesafe.config.{Config, ConfigFactory} + +trait AbstractRemoteActorMultiJvmSpec { + def NrOfNodes: Int + def commonConfig: Config + + def remotes: List[String] = { + val listOpt = Option(System.getProperty("test.hosts")).map(_.split(",").toList) + listOpt getOrElse List.fill(NrOfNodes)("localhost") + } + + val nodeConfigs = ((1 to NrOfNodes).toList zip remotes) map { + case (idx, host) => + ConfigFactory.parseString(""" + akka { + remote.server.hostname="%s" + remote.server.port = "999%d" + cluster.nodename = "node%d" + }""".format(host, idx, idx)) withFallback commonConfig + } +} diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/Barrier.scala b/akka-remote/src/multi-jvm/scala/akka/remote/Barrier.scala new file mode 100755 index 0000000000..6474179001 --- /dev/null +++ b/akka-remote/src/multi-jvm/scala/akka/remote/Barrier.scala @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.remote + +trait Barrier { + def await() = { enter(); leave() } + + def apply(body: ⇒ Unit) { + enter() + body + leave() + } + + def enter(): Unit + + def leave(): Unit +} diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala index db2704882c..11a1d2fe86 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala @@ -6,8 +6,8 @@ import akka.actor.{ Actor, Props } import akka.testkit._ import akka.dispatch.Await -object DirectRoutedRemoteActorMultiJvmSpec { - val NrOfNodes = 2 +object DirectRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { + override def NrOfNodes = 2 class SomeActor extends Actor with Serializable { def receive = { @@ -16,7 +16,7 @@ object DirectRoutedRemoteActorMultiJvmSpec { } import com.typesafe.config.ConfigFactory - val commonConfig = ConfigFactory.parseString(""" + override def commonConfig = ConfigFactory.parseString(""" akka { loglevel = "WARNING" actor { @@ -25,23 +25,10 @@ object DirectRoutedRemoteActorMultiJvmSpec { /service-hello.remote = "akka://AkkaRemoteSpec@localhost:9991" } } - remote.server.hostname = "localhost" }""") - - val node1Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9991" - cluster.nodename = "node1" - }""") withFallback commonConfig - - val node2Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9992" - cluster.nodename = "node2" - }""") withFallback commonConfig } -class DirectRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(DirectRoutedRemoteActorMultiJvmSpec.node1Config) { +class DirectRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(DirectRoutedRemoteActorMultiJvmSpec.nodeConfigs(0)) { import DirectRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes @@ -53,7 +40,7 @@ class DirectRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(DirectRoutedRe } } -class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(DirectRoutedRemoteActorMultiJvmSpec.node2Config) with DefaultTimeout { +class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(DirectRoutedRemoteActorMultiJvmSpec.nodeConfigs(1)) with DefaultTimeout { import DirectRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/FileBasedBarrier.scala b/akka-remote/src/multi-jvm/scala/akka/remote/FileBasedBarrier.scala index 2b14347e98..68347715d2 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/FileBasedBarrier.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/FileBasedBarrier.scala @@ -26,7 +26,7 @@ class FileBasedBarrier( group: String, node: String, timeout: Duration = FileBasedBarrier.DefaultTimeout, - sleep: Duration = FileBasedBarrier.DefaultSleep) { + sleep: Duration = FileBasedBarrier.DefaultSleep) extends Barrier { val barrierDir = { val dir = new File(new File(new File(FileBasedBarrier.HomeDir), group), name) @@ -38,14 +38,6 @@ class FileBasedBarrier( val readyFile = new File(barrierDir, "ready") - def await() = { enter(); leave() } - - def apply(body: ⇒ Unit) { - enter() - body - leave() - } - def enter() = { createNode() if (nodesPresent >= count) createReady() diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala index 85bd5f4a56..a8fbffff4d 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala @@ -7,8 +7,8 @@ import akka.testkit._ import akka.util.duration._ import akka.dispatch.Await -object NewRemoteActorMultiJvmSpec { - val NrOfNodes = 2 +object NewRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { + override def NrOfNodes = 2 class SomeActor extends Actor with Serializable { def receive = { @@ -17,7 +17,7 @@ object NewRemoteActorMultiJvmSpec { } import com.typesafe.config.ConfigFactory - val commonConfig = ConfigFactory.parseString(""" + override def commonConfig = ConfigFactory.parseString(""" akka { loglevel = "WARNING" actor { @@ -26,23 +26,10 @@ object NewRemoteActorMultiJvmSpec { /service-hello.remote = "akka://AkkaRemoteSpec@localhost:9991" } } - remote.server.hostname = "localhost" }""") - - val node1Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9991" - cluster.nodename = "node1" - }""") withFallback commonConfig - - val node2Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9992" - cluster.nodename = "node2" - }""") withFallback commonConfig } -class NewRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(NewRemoteActorMultiJvmSpec.node1Config) { +class NewRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(NewRemoteActorMultiJvmSpec.nodeConfigs(0)) { import NewRemoteActorMultiJvmSpec._ @@ -57,7 +44,7 @@ class NewRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(NewRemoteActorMultiJvmS } } -class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(NewRemoteActorMultiJvmSpec.node2Config) with DefaultTimeout { +class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(NewRemoteActorMultiJvmSpec.nodeConfigs(1)) with DefaultTimeout { import NewRemoteActorMultiJvmSpec._ diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala index 16d3a49342..33bb8cd36e 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -6,8 +6,8 @@ import akka.routing._ import akka.testkit.DefaultTimeout import akka.dispatch.Await -object RandomRoutedRemoteActorMultiJvmSpec { - val NrOfNodes = 4 +object RandomRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { + override def NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { case "hit" ⇒ sender ! context.system.nodename @@ -16,7 +16,7 @@ object RandomRoutedRemoteActorMultiJvmSpec { } import com.typesafe.config.ConfigFactory - val commonConfig = ConfigFactory.parseString(""" + override def commonConfig = ConfigFactory.parseString(""" akka { loglevel = "WARNING" actor { @@ -27,35 +27,10 @@ object RandomRoutedRemoteActorMultiJvmSpec { /service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"] } } - remote.server.hostname = "localhost" }""") - - val node1Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9991" - cluster.nodename = "node1" - }""") withFallback commonConfig - - val node2Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9992" - cluster.nodename = "node2" - }""") withFallback commonConfig - - val node3Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9993" - cluster.nodename = "node3" - }""") withFallback commonConfig - - val node4Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9994" - cluster.nodename = "node4" - }""") withFallback commonConfig } -class RandomRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.node1Config) { +class RandomRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(0)) { import RandomRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { @@ -68,7 +43,7 @@ class RandomRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(RandomRoutedRe } } -class RandomRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.node2Config) { +class RandomRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(1)) { import RandomRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { @@ -81,7 +56,7 @@ class RandomRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(RandomRoutedRe } } -class RandomRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.node3Config) { +class RandomRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(2)) { import RandomRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { @@ -94,7 +69,7 @@ class RandomRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(RandomRoutedRe } } -class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.node4Config) with DefaultTimeout { +class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(3)) with DefaultTimeout { import RandomRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "A new remote actor configured with a Random router" must { diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala index 61b99a2b3e..a04d7bd855 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala @@ -6,8 +6,9 @@ import akka.routing._ import akka.testkit.DefaultTimeout import akka.dispatch.Await -object RoundRobinRoutedRemoteActorMultiJvmSpec { - val NrOfNodes = 4 +object RoundRobinRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { + override def NrOfNodes = 4 + class SomeActor extends Actor with Serializable { def receive = { case "hit" ⇒ sender ! context.system.nodename @@ -16,7 +17,7 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec { } import com.typesafe.config.ConfigFactory - val commonConfig = ConfigFactory.parseString(""" + override def commonConfig = ConfigFactory.parseString(""" akka { loglevel = "WARNING" actor { @@ -27,35 +28,10 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec { /service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"] } } - remote.server.hostname = "localhost" }""") - - val node1Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9991" - cluster.nodename = "node1" - }""") withFallback commonConfig - - val node2Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9992" - cluster.nodename = "node2" - }""") withFallback commonConfig - - val node3Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9993" - cluster.nodename = "node3" - }""") withFallback commonConfig - - val node4Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9994" - cluster.nodename = "node4" - }""") withFallback commonConfig } -class RoundRobinRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.node1Config) { +class RoundRobinRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(0)) { import RoundRobinRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { @@ -68,7 +44,7 @@ class RoundRobinRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(RoundRobin } } -class RoundRobinRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.node2Config) { +class RoundRobinRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(1)) { import RoundRobinRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { @@ -81,7 +57,7 @@ class RoundRobinRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(RoundRobin } } -class RoundRobinRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.node3Config) { +class RoundRobinRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(2)) { import RoundRobinRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { @@ -94,7 +70,7 @@ class RoundRobinRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(RoundRobin } } -class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.node4Config) with DefaultTimeout { +class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(3)) with DefaultTimeout { import RoundRobinRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "A new remote actor configured with a RoundRobin router" must { diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala index 93d60e72dd..04800d7c89 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala @@ -6,8 +6,8 @@ import akka.routing._ import akka.testkit._ import akka.util.duration._ -object ScatterGatherRoutedRemoteActorMultiJvmSpec { - val NrOfNodes = 4 +object ScatterGatherRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { + override def NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { case "hit" ⇒ sender ! context.system.nodename @@ -16,7 +16,7 @@ object ScatterGatherRoutedRemoteActorMultiJvmSpec { } import com.typesafe.config.ConfigFactory - val commonConfig = ConfigFactory.parseString(""" + override def commonConfig = ConfigFactory.parseString(""" akka { loglevel = "WARNING" actor { @@ -27,35 +27,10 @@ object ScatterGatherRoutedRemoteActorMultiJvmSpec { /service-hello.target.nodes = ["akka://AkkaRemoteSpec@localhost:9991","akka://AkkaRemoteSpec@localhost:9992","akka://AkkaRemoteSpec@localhost:9993"] } } - remote.server.hostname = "localhost" }""") - - val node1Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9991" - cluster.nodename = "node1" - }""") withFallback commonConfig - - val node2Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9992" - cluster.nodename = "node2" - }""") withFallback commonConfig - - val node3Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9993" - cluster.nodename = "node3" - }""") withFallback commonConfig - - val node4Config = ConfigFactory.parseString(""" - akka { - remote.server.port = "9994" - cluster.nodename = "node4" - }""") withFallback commonConfig } -class ScatterGatherRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.node1Config) { +class ScatterGatherRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(0)) { import ScatterGatherRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { @@ -68,7 +43,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(Scatter } } -class ScatterGatherRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.node2Config) { +class ScatterGatherRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(1)) { import ScatterGatherRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { @@ -81,7 +56,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(Scatter } } -class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.node3Config) { +class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(2)) { import ScatterGatherRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "___" must { @@ -94,7 +69,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(Scatter } } -class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.node4Config) +class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(3)) with DefaultTimeout with ImplicitSender { import ScatterGatherRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala new file mode 100755 index 0000000000..44a50448ce --- /dev/null +++ b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2011 Typesafe + */ +package akka.remote + +import com.typesafe.config.Config +import org.apache.zookeeper._ +import ZooDefs.Ids + +class ZKClient(config: Config) extends Watcher { + // Don't forget to close! + lazy val zk: ZooKeeper = { + val remoteNodes = config.getString("akka.test.remote.nodes") split ',' map { + case hostport => hostport.split(":")(0) + } + + // ZkServers are configured to listen on a specific port. + val connectString = remoteNodes map (_+":2181") mkString "," + new ZooKeeper(connectString, 3000, this) + } + + def process(ev: WatchedEvent) { + synchronized { notify() } + } + + class ZkBarrier(name: String, count: Int, root: String) extends Barrier { + if (zk.exists(root, false) eq null) { + zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) + } + + def enter() { + zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL) + while (true) { + ZKClient.this.synchronized { + if (zk.getChildren(root, true).size < count) { + ZKClient.this.wait() + } + } + } + } + + def leave() { + zk.delete(root + "/" + name, -1) + while (true) { + ZKClient.this.synchronized { + if (zk.getChildren(root, true).size > 0) { + ZKClient.this.wait() + } + } + } + } + } + + def barrier(name: String, count: Int, root: String) = new ZkBarrier(name, count, root) +}