diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/SimpleRemoteSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala similarity index 77% rename from akka-remote-tests/src/multi-jvm/scala/akka/remote/SimpleRemoteSpec.scala rename to akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala index dcc4b60526..797ff97ecd 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/SimpleRemoteSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala @@ -11,7 +11,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -object SimpleRemoteMultiJvmSpec extends MultiNodeConfig { +object LookupRemoteActorMultiJvmSpec extends MultiNodeConfig { class SomeActor extends Actor with Serializable { def receive = { @@ -26,12 +26,12 @@ object SimpleRemoteMultiJvmSpec extends MultiNodeConfig { } -class SimpleRemoteMultiJvmNode1 extends SimpleRemoteSpec -class SimpleRemoteMultiJvmNode2 extends SimpleRemoteSpec +class LookupRemoteActorMultiJvmNode1 extends LookupRemoteActorSpec +class LookupRemoteActorMultiJvmNode2 extends LookupRemoteActorSpec -class SimpleRemoteSpec extends MultiNodeSpec(SimpleRemoteMultiJvmSpec) +class LookupRemoteActorSpec extends MultiNodeSpec(LookupRemoteActorMultiJvmSpec) with ImplicitSender with DefaultTimeout { - import SimpleRemoteMultiJvmSpec._ + import LookupRemoteActorMultiJvmSpec._ def initialParticipants = 2 diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/DirectRoutedRemoteActorMultiJvmSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala similarity index 83% rename from akka-remote-tests/src/multi-jvm/scala/akka/remote/router/DirectRoutedRemoteActorMultiJvmSpec.scala rename to akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala index 294bc80884..4342a20178 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/DirectRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala @@ -14,7 +14,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -object DirectRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig { +object NewRemoteActorMultiJvmSpec extends MultiNodeConfig { class SomeActor extends Actor with Serializable { def receive = { @@ -28,20 +28,20 @@ object DirectRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig { val slave = role("slave") deployOn(master, """/service-hello.remote = "@slave@" """) - + deployOnAll("""/service-hello2.remote = "@slave@" """) } -class DirectRoutedRemoteActorMultiJvmNode1 extends DirectRoutedRemoteActorSpec -class DirectRoutedRemoteActorMultiJvmNode2 extends DirectRoutedRemoteActorSpec +class NewRemoteActorMultiJvmNode1 extends NewRemoteActorSpec +class NewRemoteActorMultiJvmNode2 extends NewRemoteActorSpec -class DirectRoutedRemoteActorSpec extends MultiNodeSpec(DirectRoutedRemoteActorMultiJvmSpec) +class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec) with ImplicitSender with DefaultTimeout { - import DirectRoutedRemoteActorMultiJvmSpec._ + import NewRemoteActorMultiJvmSpec._ def initialParticipants = 2 - "A new remote actor configured with a Direct router" must { + "A new remote actor" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { runOn(master) { diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/QuietReporter.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/QuietReporter.scala similarity index 100% rename from akka-remote/src/multi-jvm/scala/akka/remote/QuietReporter.scala rename to akka-remote-tests/src/multi-jvm/scala/akka/remote/QuietReporter.scala diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala new file mode 100644 index 0000000000..87bae463ce --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote.router + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.dispatch.Await +import akka.pattern.ask +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.routing.Broadcast +import akka.routing.RandomRouter +import akka.routing.RoutedActorRef +import akka.testkit._ + +object RandomRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig { + + class SomeActor extends Actor with Serializable { + def receive = { + case "hit" ⇒ sender ! self + case "end" ⇒ context.stop(self) + } + } + + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false)) + + deployOnAll(""" + /service-hello.router = "random" + /service-hello.nr-of-instances = 3 + /service-hello.target.nodes = ["@first@", "@second@", "@third@"] + """) +} + +class RandomRoutedRemoteActorMultiJvmNode1 extends RandomRoutedRemoteActorSpec +class RandomRoutedRemoteActorMultiJvmNode2 extends RandomRoutedRemoteActorSpec +class RandomRoutedRemoteActorMultiJvmNode3 extends RandomRoutedRemoteActorSpec +class RandomRoutedRemoteActorMultiJvmNode4 extends RandomRoutedRemoteActorSpec + +class RandomRoutedRemoteActorSpec extends MultiNodeSpec(RandomRoutedRemoteActorMultiJvmSpec) + with ImplicitSender with DefaultTimeout { + import RandomRoutedRemoteActorMultiJvmSpec._ + + def initialParticipants = 4 + + "A new remote actor configured with a Random router" must { + "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { + + runOn(first, second, third) { + testConductor.enter("start", "broadcast-end", "end", "done") + } + + runOn(fourth) { + testConductor.enter("start") + val actor = system.actorOf(Props[SomeActor].withRouter(RandomRouter()), "service-hello") + actor.isInstanceOf[RoutedActorRef] must be(true) + + val connectionCount = 3 + val iterationCount = 10 + + var replies = Map( + node(first).address -> 0, + node(second).address -> 0, + node(third).address -> 0) + + for (i ← 0 until iterationCount) { + for (k ← 0 until connectionCount) { + val nodeAddress = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address + replies = replies + (nodeAddress -> (replies(nodeAddress) + 1)) + } + } + + testConductor.enter("broadcast-end") + actor ! Broadcast("end") + + testConductor.enter("end") + replies.values foreach { _ must be > (0) } + + // shut down the actor before we let the other node(s) shut down so we don't try to send + // "Terminate" to a shut down node + system.stop(actor) + testConductor.enter("done") + } + } + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala new file mode 100644 index 0000000000..48026af375 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote.router + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.dispatch.Await +import akka.pattern.ask +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.routing.Broadcast +import akka.routing.RoundRobinRouter +import akka.routing.RoutedActorRef +import akka.testkit._ + +object RoundRobinRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig { + + class SomeActor extends Actor with Serializable { + def receive = { + case "hit" ⇒ sender ! self + case "end" ⇒ context.stop(self) + } + } + + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false)) + + deployOnAll(""" + /service-hello.router = "round-robin" + /service-hello.nr-of-instances = 3 + /service-hello.target.nodes = ["@first@", "@second@", "@third@"] + """) +} + +class RoundRobinRoutedRemoteActorMultiJvmNode1 extends RoundRobinRoutedRemoteActorSpec +class RoundRobinRoutedRemoteActorMultiJvmNode2 extends RoundRobinRoutedRemoteActorSpec +class RoundRobinRoutedRemoteActorMultiJvmNode3 extends RoundRobinRoutedRemoteActorSpec +class RoundRobinRoutedRemoteActorMultiJvmNode4 extends RoundRobinRoutedRemoteActorSpec + +class RoundRobinRoutedRemoteActorSpec extends MultiNodeSpec(RoundRobinRoutedRemoteActorMultiJvmSpec) + with ImplicitSender with DefaultTimeout { + import RoundRobinRoutedRemoteActorMultiJvmSpec._ + + def initialParticipants = 4 + + "A new remote actor configured with a RoundRobin router" must { + "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { + + runOn(first, second, third) { + testConductor.enter("start", "broadcast-end", "end", "done") + } + + runOn(fourth) { + testConductor.enter("start") + val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") + actor.isInstanceOf[RoutedActorRef] must be(true) + + val connectionCount = 3 + val iterationCount = 10 + + var replies = Map( + node(first).address -> 0, + node(second).address -> 0, + node(third).address -> 0) + + for (i ← 0 until iterationCount) { + for (k ← 0 until connectionCount) { + val nodeAddress = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address + replies = replies + (nodeAddress -> (replies(nodeAddress) + 1)) + } + } + + testConductor.enter("broadcast-end") + actor ! Broadcast("end") + + testConductor.enter("end") + replies.values foreach { _ must be(10) } + + // shut down the actor before we let the other node(s) shut down so we don't try to send + // "Terminate" to a shut down node + system.stop(actor) + testConductor.enter("done") + } + } + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala new file mode 100644 index 0000000000..7afa86d22e --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote.router + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.dispatch.Await +import akka.pattern.ask +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.routing.Broadcast +import akka.routing.ScatterGatherFirstCompletedRouter +import akka.routing.RoutedActorRef +import akka.testkit._ +import akka.util.duration._ + +object ScatterGatherRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig { + + class SomeActor extends Actor with Serializable { + def receive = { + case "hit" ⇒ sender ! self + case "end" ⇒ context.stop(self) + } + } + + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false)) + + deployOnAll(""" + /service-hello.router = "scatter-gather" + /service-hello.nr-of-instances = 3 + /service-hello.target.nodes = ["@first@", "@second@", "@third@"] + """) +} + +class ScatterGatherRoutedRemoteActorMultiJvmNode1 extends ScatterGatherRoutedRemoteActorSpec +class ScatterGatherRoutedRemoteActorMultiJvmNode2 extends ScatterGatherRoutedRemoteActorSpec +class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends ScatterGatherRoutedRemoteActorSpec +class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends ScatterGatherRoutedRemoteActorSpec + +class ScatterGatherRoutedRemoteActorSpec extends MultiNodeSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec) + with ImplicitSender with DefaultTimeout { + import ScatterGatherRoutedRemoteActorMultiJvmSpec._ + + def initialParticipants = 4 + + "A new remote actor configured with a ScatterGatherFirstCompleted router" must { + "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { + + runOn(first, second, third) { + testConductor.enter("start", "broadcast-end", "end", "done") + } + + runOn(fourth) { + testConductor.enter("start") + val actor = system.actorOf(Props[SomeActor].withRouter(ScatterGatherFirstCompletedRouter(within = 10 seconds)), "service-hello") + actor.isInstanceOf[RoutedActorRef] must be(true) + + val connectionCount = 3 + val iterationCount = 10 + + for (i ← 0 until iterationCount) { + for (k ← 0 until connectionCount) { + actor ! "hit" + } + } + + val replies = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) { + case ref: ActorRef ⇒ (ref.path.address, 1) + }).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) { + case (m, (n, c)) ⇒ m + (n -> (m(n) + c)) + } + + testConductor.enter("broadcast-end") + actor ! Broadcast("end") + + testConductor.enter("end") + replies.values.sum must be === connectionCount * iterationCount + + // shut down the actor before we let the other node(s) shut down so we don't try to send + // "Terminate" to a shut down node + system.stop(actor) + testConductor.enter("done") + } + } + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala index df6388d562..1d58b48a00 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala @@ -3,9 +3,7 @@ */ package akka.remote.testconductor -import akka.remote.AkkaRemoteSpec import com.typesafe.config.ConfigFactory -import akka.remote.AbstractRemoteActorMultiJvmSpec import akka.actor.Props import akka.actor.Actor import akka.dispatch.Await @@ -20,7 +18,7 @@ import akka.remote.testkit.MultiNodeConfig object TestConductorMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false)) - + val master = role("master") val slave = role("slave") } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala deleted file mode 100644 index ca4313b56b..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala +++ /dev/null @@ -1,29 +0,0 @@ -package akka.remote - -import com.typesafe.config.{Config, ConfigFactory} -import akka.actor.Address - -trait AbstractRemoteActorMultiJvmSpec { - def NrOfNodes: Int - def commonConfig: Config - - def PortRangeStart = 1990 - def NodeRange = 1 to NrOfNodes - - private[this] val remotes: IndexedSeq[String] = { - val nodesOpt = Option(AkkaRemoteSpec.testNodes).map(_.split(",").toIndexedSeq) - nodesOpt getOrElse IndexedSeq.fill(NrOfNodes)("localhost") - } - - val nodeConfigs = (NodeRange.toList zip remotes) map { - case (port, host) => - ConfigFactory.parseString(""" - akka { - remote.netty.hostname="%s" - remote.netty.port = "%d" - }""".format(host, PortRangeStart + port, port)) withFallback commonConfig - } - - def akkaSpec(port: Int) = "AkkaRemoteSpec@%s:%d".format(remotes(port), PortRangeStart + 1 + port) - def akkaURIs(count: Int): String = 0 until count map {idx => "\"akka://" + akkaSpec(idx) + "\""} mkString "," -} diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala deleted file mode 100644 index c1a2109bc0..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.remote - -import akka.testkit._ -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import com.typesafe.config.ConfigParseOptions -import com.typesafe.config.ConfigResolveOptions -import java.io.File -import akka.actor.{ActorSystem, ActorSystemImpl} - -object AkkaRemoteSpec { - private def configParseOptions = ConfigParseOptions.defaults.setAllowMissing(false) - - val testConf: Config = { - System.getProperty("akka.config") match { - case null ⇒ AkkaSpec.testConf - case location ⇒ - ConfigFactory.systemProperties - .withFallback(ConfigFactory.parseFileAnySyntax(new File(location), configParseOptions)) - .withFallback(ConfigFactory.defaultReference(ActorSystem.findClassLoader())).resolve(ConfigResolveOptions.defaults) - } - } - - val testNodes = System.getProperty("test.hosts") -} - -abstract class AkkaRemoteSpec(config: Config) - extends AkkaSpec(config.withFallback(AkkaRemoteSpec.testConf)) - with MultiJvmSync diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/Barrier.scala b/akka-remote/src/multi-jvm/scala/akka/remote/Barrier.scala deleted file mode 100644 index e99fca2a45..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/Barrier.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.remote - -trait Barrier { - def await() = { enter(); leave() } - - def apply(body: ⇒ Unit) { - enter() - body - leave() - } - - def enter(): Unit - - def leave(): Unit -} diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala deleted file mode 100644 index 3026ddd613..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala +++ /dev/null @@ -1,64 +0,0 @@ -package akka.remote - -import akka.actor.{ Actor, ActorRef, Props } -import akka.testkit._ -import akka.dispatch.Await -import akka.pattern.ask - -object DirectRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { - override def NrOfNodes = 2 - - class SomeActor extends Actor with Serializable { - def receive = { - case "identify" ⇒ sender ! self - } - } - - import com.typesafe.config.ConfigFactory - override def commonConfig = ConfigFactory.parseString(""" - akka { - loglevel = "WARNING" - actor { - provider = "akka.remote.RemoteActorRefProvider" - deployment { - /service-hello.remote = %s - } - } - }""" format akkaURIs(1)) -} - -import DirectRoutedRemoteActorMultiJvmSpec._ - -class DirectRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(nodeConfigs(0)) { - import DirectRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - - "___" must { - "___" in { - barrier("start") - barrier("done") - } - } -} - -class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(nodeConfigs(1)) with DefaultTimeout { - - import DirectRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - - "A new remote actor configured with a Direct router" must { - "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { - barrier("start") - - val actor = system.actorOf(Props[SomeActor], "service-hello") - actor.isInstanceOf[RemoteActorRef] must be(true) - - Await.result(actor ? "identify", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort must equal(akkaSpec(0)) - - // shut down the actor before we let the other node(s) shut down so we don't try to send - // "Terminate" to a shut down node - system.stop(actor) - barrier("done") - } - } -} diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/FileBasedBarrier.scala b/akka-remote/src/multi-jvm/scala/akka/remote/FileBasedBarrier.scala deleted file mode 100644 index a1773fc86e..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/FileBasedBarrier.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.remote - -import akka.util.duration._ -import akka.util.Duration -import System.{ currentTimeMillis ⇒ now } - -import java.io.File - -class BarrierTimeoutException(message: String) extends RuntimeException(message) - -object FileBasedBarrier { - val HomeDir = ".multi-jvm" - val DefaultTimeout = 30.seconds - val DefaultSleep = 100.millis -} - -import FileBasedBarrier._ - -class FileBasedBarrier( - name: String, - count: Int, - group: String, - node: String, - timeout: Duration = FileBasedBarrier.DefaultTimeout, - sleep: Duration = FileBasedBarrier.DefaultSleep) extends Barrier { - - val barrierDir = { - val dir = new File(new File(new File(FileBasedBarrier.HomeDir), group), name) - dir.mkdirs() - dir - } - - val nodeFile = new File(barrierDir, node) - - val readyFile = new File(barrierDir, "ready") - - def enter() = { - createNode() - if (nodesPresent >= count) createReady() - val ready = waitFor(readyFile.exists, timeout, sleep) - if (!ready) expire("entry") - } - - def leave() = { - removeNode() - val empty = waitFor(nodesPresent <= 1, timeout, sleep) - removeReady() - if (!empty) expire("exit") - } - - def nodesPresent = barrierDir.list.size - - def createNode() = nodeFile.createNewFile() - - def removeNode() = nodeFile.delete() - - def createReady() = readyFile.createNewFile() - - def removeReady() = readyFile.delete() - - def waitFor(test: ⇒ Boolean, timeout: Duration, sleep: Duration): Boolean = { - val start = now - val limit = start + timeout.toMillis - var passed = test - var expired = false - while (!passed && !expired) { - if (now > limit) expired = true - else { - Thread.sleep(sleep.toMillis) - passed = test - } - } - passed - } - - def expire(barrier: String) = { - throw new BarrierTimeoutException("Timeout (%s) waiting for %s barrier" format (timeout, barrier)) - } -} diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala b/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala deleted file mode 100644 index c1e6080e6e..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.remote - -import akka.testkit.AkkaSpec -import akka.util.Duration - -trait MultiJvmSync extends AkkaSpec { - def nodes: Int - - override def atStartup() = { - onStart() - MultiJvmSync.start(getClass.getName, nodes) - } - - def onStart() {} - - override def atTermination() = { - MultiJvmSync.end(getClass.getName, nodes) - onEnd() - } - - def onEnd() {} - - def barrier(name: String, timeout: Duration = FileBasedBarrier.DefaultTimeout) = { - MultiJvmSync.barrier(name, nodes, getClass.getName, timeout) - } -} - -object MultiJvmSync { - val TestMarker = "MultiJvm" - val StartBarrier = "multi-jvm-start" - val EndBarrier = "multi-jvm-end" - - def start(className: String, count: Int) = barrier(StartBarrier, count, className) - - def end(className: String, count: Int) = barrier(EndBarrier, count, className) - - def barrier(name: String, count: Int, className: String, timeout: Duration = FileBasedBarrier.DefaultTimeout) = { - val Array(testName, nodeName) = className split TestMarker - val barrier = if (AkkaRemoteSpec.testNodes eq null) - new FileBasedBarrier(name, count, testName, nodeName, timeout) - else - new ZkClient.ZkBarrier(nodeName, count, "/" + testName + "_" + name) - barrier.await() - } -} diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala deleted file mode 100644 index c3dc1ae9de..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala +++ /dev/null @@ -1,65 +0,0 @@ -package akka.remote - -import akka.actor.{ Actor, ActorRef, Props } -import akka.testkit._ -import akka.dispatch.Await -import akka.pattern.ask - -object NewRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { - override def NrOfNodes = 2 - - class SomeActor extends Actor with Serializable { - def receive = { - case "identify" ⇒ sender ! self - } - } - - import com.typesafe.config.ConfigFactory - override def commonConfig = ConfigFactory.parseString(""" - akka { - loglevel = "WARNING" - actor { - provider = "akka.remote.RemoteActorRefProvider" - deployment { - /service-hello.remote = %s - } - } - }""" format akkaURIs(1)) -} - -class NewRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(NewRemoteActorMultiJvmSpec.nodeConfigs(0)) { - - import NewRemoteActorMultiJvmSpec._ - - val nodes = NrOfNodes - - "___" must { - "___" in { - barrier("start") - - barrier("done") - } - } -} - -class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(NewRemoteActorMultiJvmSpec.nodeConfigs(1)) with DefaultTimeout { - - import NewRemoteActorMultiJvmSpec._ - - val nodes = NrOfNodes - - "A new remote actor" must { - "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { - barrier("start") - - val actor = system.actorOf(Props[SomeActor], "service-hello") - Await.result(actor ? "identify", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort must equal(akkaSpec(0)) - - // shut down the actor before we let the other node(s) shut down so we don't try to send - // "Terminate" to a shut down node - system.stop(actor) - barrier("done") - } - } -} - diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala deleted file mode 100644 index 2b2b233dee..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala +++ /dev/null @@ -1,110 +0,0 @@ -package akka.remote - -import akka.actor.{ Actor, ActorRef, Props } -import akka.routing._ -import akka.testkit.DefaultTimeout -import akka.dispatch.Await -import akka.pattern.ask - -object RandomRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { - override def NrOfNodes = 4 - class SomeActor extends Actor with Serializable { - def receive = { - case "hit" ⇒ sender ! self - case "end" ⇒ context.stop(self) - } - } - - import com.typesafe.config.ConfigFactory - override def commonConfig = ConfigFactory.parseString(""" - akka { - loglevel = "WARNING" - actor { - provider = "akka.remote.RemoteActorRefProvider" - deployment { - /service-hello.router = "random" - /service-hello.nr-of-instances = %d - /service-hello.target.nodes = [%s] - } - } - }""" format (3, akkaURIs(3))) -} - -class RandomRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(0)) { - import RandomRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "___" must { - "___" in { - barrier("start") - barrier("broadcast-end") - barrier("end") - barrier("done") - } - } -} - -class RandomRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(1)) { - import RandomRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "___" must { - "___" in { - barrier("start") - barrier("broadcast-end") - barrier("end") - barrier("done") - } - } -} - -class RandomRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(2)) { - import RandomRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "___" must { - "___" in { - barrier("start") - barrier("broadcast-end") - barrier("end") - barrier("done") - } - } -} - -class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(3)) with DefaultTimeout { - import RandomRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "A new remote actor configured with a Random router" must { - "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { - - barrier("start") - val actor = system.actorOf(Props[SomeActor].withRouter(RandomRouter()), "service-hello") - actor.isInstanceOf[RoutedActorRef] must be(true) - - val connectionCount = NrOfNodes - 1 - val iterationCount = 10 - - var replies = Map( - akkaSpec(0) -> 0, - akkaSpec(1) -> 0, - akkaSpec(2) -> 0) - - for (i ← 0 until iterationCount) { - for (k ← 0 until connectionCount) { - val nodeName = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort - replies = replies + (nodeName -> (replies(nodeName) + 1)) - } - } - - barrier("broadcast-end") - actor ! Broadcast("end") - - barrier("end") - replies.values foreach { _ must be > (0) } - - // shut down the actor before we let the other node(s) shut down so we don't try to send - // "Terminate" to a shut down node - system.stop(actor) - barrier("done") - } - } -} - diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala deleted file mode 100644 index c84aa46366..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ /dev/null @@ -1,112 +0,0 @@ -package akka.remote - -import akka.actor.{ Actor, ActorRef, Props } -import akka.routing._ -import akka.testkit.DefaultTimeout -import akka.dispatch.Await -import akka.pattern.ask - -object RoundRobinRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { - override def NrOfNodes = 4 - - class SomeActor extends Actor with Serializable { - def receive = { - case "hit" ⇒ sender ! self - case "end" ⇒ context.stop(self) - } - } - - import com.typesafe.config.ConfigFactory - override def commonConfig = ConfigFactory.parseString(""" - akka { - loglevel = "WARNING" - actor { - provider = "akka.remote.RemoteActorRefProvider" - deployment { - /service-hello.router = "round-robin" - /service-hello.nr-of-instances = %d - /service-hello.target.nodes = [%s] - } - } - }""" format (3, akkaURIs(3))) -} - -class RoundRobinRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(0)) { - import RoundRobinRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "___" must { - "___" in { - barrier("start") - barrier("broadcast-end") - barrier("end") - barrier("done") - } - } -} - -class RoundRobinRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(1)) { - import RoundRobinRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "___" must { - "___" in { - barrier("start") - barrier("broadcast-end") - barrier("end") - barrier("done") - } - } -} - -class RoundRobinRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(2)) { - import RoundRobinRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "___" must { - "___" in { - barrier("start") - barrier("broadcast-end") - barrier("end") - barrier("done") - } - } -} - -class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(3)) with DefaultTimeout { - import RoundRobinRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "A new remote actor configured with a RoundRobin router" must { - "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { - - barrier("start") - val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") - actor.isInstanceOf[RoutedActorRef] must be(true) - - val connectionCount = NrOfNodes - 1 - val iterationCount = 10 - - var replies = Map( - akkaSpec(0) -> 0, - akkaSpec(1) -> 0, - akkaSpec(2) -> 0) - - for (i ← 0 until iterationCount) { - for (k ← 0 until connectionCount) { - val nodeName = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort - - replies = replies + (nodeName -> (replies(nodeName) + 1)) - } - } - - barrier("broadcast-end") - actor ! Broadcast("end") - - barrier("end") - replies.values foreach { _ must be(10) } - - // shut down the actor before we let the other node(s) shut down so we don't try to send - // "Terminate" to a shut down node - system.stop(actor) - barrier("done") - } - } -} - diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala deleted file mode 100644 index b618300ff2..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala +++ /dev/null @@ -1,107 +0,0 @@ -package akka.remote - -import akka.actor.{ Actor, ActorRef, Props } -import akka.routing._ -import akka.testkit._ -import akka.util.duration._ - -object ScatterGatherRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { - override def NrOfNodes = 4 - class SomeActor extends Actor with Serializable { - def receive = { - case "hit" ⇒ sender ! self - case "end" ⇒ context.stop(self) - } - } - - import com.typesafe.config.ConfigFactory - override def commonConfig = ConfigFactory.parseString(""" - akka { - loglevel = "WARNING" - actor { - provider = "akka.remote.RemoteActorRefProvider" - deployment { - /service-hello.router = "scatter-gather" - /service-hello.nr-of-instances = %d - /service-hello.target.nodes = [%s] - } - } - }""" format (3, akkaURIs(3))) -} - -class ScatterGatherRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(0)) { - import ScatterGatherRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "___" must { - "___" in { - barrier("start") - barrier("broadcast-end") - barrier("end") - barrier("done") - } - } -} - -class ScatterGatherRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(1)) { - import ScatterGatherRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "___" must { - "___" in { - barrier("start") - barrier("broadcast-end") - barrier("end") - barrier("done") - } - } -} - -class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(2)) { - import ScatterGatherRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "___" must { - "___" in { - barrier("start") - barrier("broadcast-end") - barrier("end") - barrier("done") - } - } -} - -class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(3)) - with DefaultTimeout with ImplicitSender { - import ScatterGatherRoutedRemoteActorMultiJvmSpec._ - val nodes = NrOfNodes - "A new remote actor configured with a ScatterGather router" must { - "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { - - barrier("start") - val actor = system.actorOf(Props[SomeActor].withRouter(ScatterGatherFirstCompletedRouter(within = 10 seconds)), "service-hello") - actor.isInstanceOf[RoutedActorRef] must be(true) - - val connectionCount = NrOfNodes - 1 - val iterationCount = 10 - - for (i ← 0 until iterationCount) { - for (k ← 0 until connectionCount) { - actor ! "hit" - } - } - - val replies = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) { - case ref: ActorRef ⇒ (ref.path.address.hostPort, 1) - }).foldLeft(Map(akkaSpec(0) -> 0, akkaSpec(1) -> 0, akkaSpec(2) -> 0)) { - case (m, (n, c)) ⇒ m + (n -> (m(n) + c)) - } - - barrier("broadcast-end") - actor ! Broadcast("end") - - barrier("end") - replies.values.sum must be === connectionCount * iterationCount - - barrier("done") - } - } -} - diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala deleted file mode 100644 index 611478babb..0000000000 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright (C) 2011-2012 Typesafe - */ -package akka.remote - -import org.apache.zookeeper._ -import ZooDefs.Ids - -object ZkClient extends Watcher { - // Don't forget to close! - lazy val zk: ZooKeeper = { - val remoteNodes = AkkaRemoteSpec.testNodes split ',' - - // ZkServers are configured to listen on a specific port. - val connectString = remoteNodes map (_+":2181") mkString "," - new ZooKeeper(connectString, 3000, this) - } - - def process(ev: WatchedEvent) { - synchronized { notify() } - } - - class ZkBarrier(name: String, count: Int, root: String) extends Barrier { - @annotation.tailrec - private def waitForServer() { - // SI-1672 - val r = try { - zk.exists("/", false) - true - } catch { - case _: KeeperException.ConnectionLossException => - Thread.sleep(10000) - false - } - if (!r) waitForServer() - } - waitForServer() - - try zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) catch { - case _: KeeperException.NodeExistsException => - } - - val timeoutMs = 300*1000 - - private def block(num: Int) { - val start = System.currentTimeMillis - while (true) { - if (System.currentTimeMillis - start > timeoutMs) throw new InterruptedException("Timed out blocking in zk") - ZkClient.this.synchronized { - val children = zk.getChildren(root, true) - if (children.size < num) { - ZkClient.this.wait(timeoutMs) - } else - return - } - } - } - - def enter() { - zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) - block(count) - } - - final def leave() { - zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) - block(2*count) - } - } - - def barrier(name: String, count: Int, root: String) = new ZkBarrier(name, count, root) -} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 7962bb6a29..e427255782 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -82,23 +82,17 @@ object AkkaBuild extends Build { id = "akka-remote", base = file("akka-remote"), dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"), - settings = defaultSettings ++ multiJvmSettings ++ OSGi.remote ++ Seq( + settings = defaultSettings ++ OSGi.remote ++ Seq( libraryDependencies ++= Dependencies.remote, // disable parallel tests - parallelExecution in Test := false, - extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => - (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq - }, - scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, - jvmOptions in MultiJvm := defaultMultiJvmOptions, - test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x } + parallelExecution in Test := false ) - ) configs (MultiJvm) + ) lazy val remoteTests = Project( id = "akka-remote-tests", base = file("akka-remote-tests"), - dependencies = Seq(remote % "compile;test->test;multi-jvm->multi-jvm", actorTests % "test->test", testkit % "test->test"), + dependencies = Seq(remote, actorTests % "test->test", testkit % "test->test"), settings = defaultSettings ++ multiJvmSettings ++ Seq( // disable parallel tests parallelExecution in Test := false, @@ -415,8 +409,7 @@ object Dependencies { ) val remote = Seq( - netty, protobuf, Test.junit, Test.scalatest, - Test.zookeeper, Test.log4j // needed for ZkBarrier in multi-jvm tests + netty, protobuf, Test.junit, Test.scalatest ) val cluster = Seq(Test.junit, Test.scalatest) @@ -482,8 +475,6 @@ object Dependency { val scalatest = "org.scalatest" % "scalatest_2.9.1" % V.Scalatest % "test" // ApacheV2 val scalacheck = "org.scala-tools.testing" % "scalacheck_2.9.1" % "1.9" % "test" // New BSD val specs2 = "org.specs2" % "specs2_2.9.1" % "1.9" % "test" // Modified BSD / ApacheV2 - val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % "3.4.0" % "test" // ApacheV2 - val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2 } }