Port all MultiJvm tests to MultiNode, see #1935

* Moved akka-remote/multi-jvm test to akka-remote-tests/multi-jvm
* Removed old test utilities that are replaced by testconductor
* Removed multi-jvm from akka-remote build, these tests are now in
  akka-remote-tests
* Removed test dependencies in build that are not needed any longer
* DirectRoutedRemoteActorMultiJvmSpec replaced with
  NewRemoteActorMultiJvmSpec, same thing
This commit is contained in:
Patrik Nordwall 2012-05-29 08:28:54 +02:00
parent b777483193
commit b9a6ccaf41
19 changed files with 295 additions and 771 deletions

View file

@ -11,7 +11,7 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
object SimpleRemoteMultiJvmSpec extends MultiNodeConfig {
object LookupRemoteActorMultiJvmSpec extends MultiNodeConfig {
class SomeActor extends Actor with Serializable {
def receive = {
@ -26,12 +26,12 @@ object SimpleRemoteMultiJvmSpec extends MultiNodeConfig {
}
class SimpleRemoteMultiJvmNode1 extends SimpleRemoteSpec
class SimpleRemoteMultiJvmNode2 extends SimpleRemoteSpec
class LookupRemoteActorMultiJvmNode1 extends LookupRemoteActorSpec
class LookupRemoteActorMultiJvmNode2 extends LookupRemoteActorSpec
class SimpleRemoteSpec extends MultiNodeSpec(SimpleRemoteMultiJvmSpec)
class LookupRemoteActorSpec extends MultiNodeSpec(LookupRemoteActorMultiJvmSpec)
with ImplicitSender with DefaultTimeout {
import SimpleRemoteMultiJvmSpec._
import LookupRemoteActorMultiJvmSpec._
def initialParticipants = 2

View file

@ -14,7 +14,7 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
object DirectRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
object NewRemoteActorMultiJvmSpec extends MultiNodeConfig {
class SomeActor extends Actor with Serializable {
def receive = {
@ -28,20 +28,20 @@ object DirectRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
val slave = role("slave")
deployOn(master, """/service-hello.remote = "@slave@" """)
deployOnAll("""/service-hello2.remote = "@slave@" """)
}
class DirectRoutedRemoteActorMultiJvmNode1 extends DirectRoutedRemoteActorSpec
class DirectRoutedRemoteActorMultiJvmNode2 extends DirectRoutedRemoteActorSpec
class NewRemoteActorMultiJvmNode1 extends NewRemoteActorSpec
class NewRemoteActorMultiJvmNode2 extends NewRemoteActorSpec
class DirectRoutedRemoteActorSpec extends MultiNodeSpec(DirectRoutedRemoteActorMultiJvmSpec)
class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec)
with ImplicitSender with DefaultTimeout {
import DirectRoutedRemoteActorMultiJvmSpec._
import NewRemoteActorMultiJvmSpec._
def initialParticipants = 2
"A new remote actor configured with a Direct router" must {
"A new remote actor" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
runOn(master) {

View file

@ -0,0 +1,92 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.router
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.dispatch.Await
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.routing.Broadcast
import akka.routing.RandomRouter
import akka.routing.RoutedActorRef
import akka.testkit._
object RandomRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" sender ! self
case "end" context.stop(self)
}
}
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false))
deployOnAll("""
/service-hello.router = "random"
/service-hello.nr-of-instances = 3
/service-hello.target.nodes = ["@first@", "@second@", "@third@"]
""")
}
class RandomRoutedRemoteActorMultiJvmNode1 extends RandomRoutedRemoteActorSpec
class RandomRoutedRemoteActorMultiJvmNode2 extends RandomRoutedRemoteActorSpec
class RandomRoutedRemoteActorMultiJvmNode3 extends RandomRoutedRemoteActorSpec
class RandomRoutedRemoteActorMultiJvmNode4 extends RandomRoutedRemoteActorSpec
class RandomRoutedRemoteActorSpec extends MultiNodeSpec(RandomRoutedRemoteActorMultiJvmSpec)
with ImplicitSender with DefaultTimeout {
import RandomRoutedRemoteActorMultiJvmSpec._
def initialParticipants = 4
"A new remote actor configured with a Random router" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
runOn(first, second, third) {
testConductor.enter("start", "broadcast-end", "end", "done")
}
runOn(fourth) {
testConductor.enter("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RandomRouter()), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true)
val connectionCount = 3
val iterationCount = 10
var replies = Map(
node(first).address -> 0,
node(second).address -> 0,
node(third).address -> 0)
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
val nodeAddress = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address
replies = replies + (nodeAddress -> (replies(nodeAddress) + 1))
}
}
testConductor.enter("broadcast-end")
actor ! Broadcast("end")
testConductor.enter("end")
replies.values foreach { _ must be > (0) }
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
testConductor.enter("done")
}
}
}
}

View file

@ -0,0 +1,92 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.router
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.dispatch.Await
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.routing.Broadcast
import akka.routing.RoundRobinRouter
import akka.routing.RoutedActorRef
import akka.testkit._
object RoundRobinRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" sender ! self
case "end" context.stop(self)
}
}
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false))
deployOnAll("""
/service-hello.router = "round-robin"
/service-hello.nr-of-instances = 3
/service-hello.target.nodes = ["@first@", "@second@", "@third@"]
""")
}
class RoundRobinRoutedRemoteActorMultiJvmNode1 extends RoundRobinRoutedRemoteActorSpec
class RoundRobinRoutedRemoteActorMultiJvmNode2 extends RoundRobinRoutedRemoteActorSpec
class RoundRobinRoutedRemoteActorMultiJvmNode3 extends RoundRobinRoutedRemoteActorSpec
class RoundRobinRoutedRemoteActorMultiJvmNode4 extends RoundRobinRoutedRemoteActorSpec
class RoundRobinRoutedRemoteActorSpec extends MultiNodeSpec(RoundRobinRoutedRemoteActorMultiJvmSpec)
with ImplicitSender with DefaultTimeout {
import RoundRobinRoutedRemoteActorMultiJvmSpec._
def initialParticipants = 4
"A new remote actor configured with a RoundRobin router" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
runOn(first, second, third) {
testConductor.enter("start", "broadcast-end", "end", "done")
}
runOn(fourth) {
testConductor.enter("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true)
val connectionCount = 3
val iterationCount = 10
var replies = Map(
node(first).address -> 0,
node(second).address -> 0,
node(third).address -> 0)
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
val nodeAddress = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address
replies = replies + (nodeAddress -> (replies(nodeAddress) + 1))
}
}
testConductor.enter("broadcast-end")
actor ! Broadcast("end")
testConductor.enter("end")
replies.values foreach { _ must be(10) }
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
testConductor.enter("done")
}
}
}
}

View file

@ -0,0 +1,93 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.router
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.dispatch.Await
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.routing.Broadcast
import akka.routing.ScatterGatherFirstCompletedRouter
import akka.routing.RoutedActorRef
import akka.testkit._
import akka.util.duration._
object ScatterGatherRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" sender ! self
case "end" context.stop(self)
}
}
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false))
deployOnAll("""
/service-hello.router = "scatter-gather"
/service-hello.nr-of-instances = 3
/service-hello.target.nodes = ["@first@", "@second@", "@third@"]
""")
}
class ScatterGatherRoutedRemoteActorMultiJvmNode1 extends ScatterGatherRoutedRemoteActorSpec
class ScatterGatherRoutedRemoteActorMultiJvmNode2 extends ScatterGatherRoutedRemoteActorSpec
class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends ScatterGatherRoutedRemoteActorSpec
class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends ScatterGatherRoutedRemoteActorSpec
class ScatterGatherRoutedRemoteActorSpec extends MultiNodeSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec)
with ImplicitSender with DefaultTimeout {
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
def initialParticipants = 4
"A new remote actor configured with a ScatterGatherFirstCompleted router" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
runOn(first, second, third) {
testConductor.enter("start", "broadcast-end", "end", "done")
}
runOn(fourth) {
testConductor.enter("start")
val actor = system.actorOf(Props[SomeActor].withRouter(ScatterGatherFirstCompletedRouter(within = 10 seconds)), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true)
val connectionCount = 3
val iterationCount = 10
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
actor ! "hit"
}
}
val replies = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
case ref: ActorRef (ref.path.address, 1)
}).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) {
case (m, (n, c)) m + (n -> (m(n) + c))
}
testConductor.enter("broadcast-end")
actor ! Broadcast("end")
testConductor.enter("end")
replies.values.sum must be === connectionCount * iterationCount
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
testConductor.enter("done")
}
}
}
}

View file

@ -3,9 +3,7 @@
*/
package akka.remote.testconductor
import akka.remote.AkkaRemoteSpec
import com.typesafe.config.ConfigFactory
import akka.remote.AbstractRemoteActorMultiJvmSpec
import akka.actor.Props
import akka.actor.Actor
import akka.dispatch.Await
@ -20,7 +18,7 @@ import akka.remote.testkit.MultiNodeConfig
object TestConductorMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false))
val master = role("master")
val slave = role("slave")
}

View file

@ -1,29 +0,0 @@
package akka.remote
import com.typesafe.config.{Config, ConfigFactory}
import akka.actor.Address
trait AbstractRemoteActorMultiJvmSpec {
def NrOfNodes: Int
def commonConfig: Config
def PortRangeStart = 1990
def NodeRange = 1 to NrOfNodes
private[this] val remotes: IndexedSeq[String] = {
val nodesOpt = Option(AkkaRemoteSpec.testNodes).map(_.split(",").toIndexedSeq)
nodesOpt getOrElse IndexedSeq.fill(NrOfNodes)("localhost")
}
val nodeConfigs = (NodeRange.toList zip remotes) map {
case (port, host) =>
ConfigFactory.parseString("""
akka {
remote.netty.hostname="%s"
remote.netty.port = "%d"
}""".format(host, PortRangeStart + port, port)) withFallback commonConfig
}
def akkaSpec(port: Int) = "AkkaRemoteSpec@%s:%d".format(remotes(port), PortRangeStart + 1 + port)
def akkaURIs(count: Int): String = 0 until count map {idx => "\"akka://" + akkaSpec(idx) + "\""} mkString ","
}

View file

@ -1,33 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.testkit._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigResolveOptions
import java.io.File
import akka.actor.{ActorSystem, ActorSystemImpl}
object AkkaRemoteSpec {
private def configParseOptions = ConfigParseOptions.defaults.setAllowMissing(false)
val testConf: Config = {
System.getProperty("akka.config") match {
case null AkkaSpec.testConf
case location
ConfigFactory.systemProperties
.withFallback(ConfigFactory.parseFileAnySyntax(new File(location), configParseOptions))
.withFallback(ConfigFactory.defaultReference(ActorSystem.findClassLoader())).resolve(ConfigResolveOptions.defaults)
}
}
val testNodes = System.getProperty("test.hosts")
}
abstract class AkkaRemoteSpec(config: Config)
extends AkkaSpec(config.withFallback(AkkaRemoteSpec.testConf))
with MultiJvmSync

View file

@ -1,19 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
trait Barrier {
def await() = { enter(); leave() }
def apply(body: Unit) {
enter()
body
leave()
}
def enter(): Unit
def leave(): Unit
}

View file

@ -1,64 +0,0 @@
package akka.remote
import akka.actor.{ Actor, ActorRef, Props }
import akka.testkit._
import akka.dispatch.Await
import akka.pattern.ask
object DirectRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec {
override def NrOfNodes = 2
class SomeActor extends Actor with Serializable {
def receive = {
case "identify" sender ! self
}
}
import com.typesafe.config.ConfigFactory
override def commonConfig = ConfigFactory.parseString("""
akka {
loglevel = "WARNING"
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/service-hello.remote = %s
}
}
}""" format akkaURIs(1))
}
import DirectRoutedRemoteActorMultiJvmSpec._
class DirectRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(nodeConfigs(0)) {
import DirectRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("done")
}
}
}
class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(nodeConfigs(1)) with DefaultTimeout {
import DirectRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"A new remote actor configured with a Direct router" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
barrier("start")
val actor = system.actorOf(Props[SomeActor], "service-hello")
actor.isInstanceOf[RemoteActorRef] must be(true)
Await.result(actor ? "identify", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort must equal(akkaSpec(0))
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
barrier("done")
}
}
}

View file

@ -1,83 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.util.duration._
import akka.util.Duration
import System.{ currentTimeMillis now }
import java.io.File
class BarrierTimeoutException(message: String) extends RuntimeException(message)
object FileBasedBarrier {
val HomeDir = ".multi-jvm"
val DefaultTimeout = 30.seconds
val DefaultSleep = 100.millis
}
import FileBasedBarrier._
class FileBasedBarrier(
name: String,
count: Int,
group: String,
node: String,
timeout: Duration = FileBasedBarrier.DefaultTimeout,
sleep: Duration = FileBasedBarrier.DefaultSleep) extends Barrier {
val barrierDir = {
val dir = new File(new File(new File(FileBasedBarrier.HomeDir), group), name)
dir.mkdirs()
dir
}
val nodeFile = new File(barrierDir, node)
val readyFile = new File(barrierDir, "ready")
def enter() = {
createNode()
if (nodesPresent >= count) createReady()
val ready = waitFor(readyFile.exists, timeout, sleep)
if (!ready) expire("entry")
}
def leave() = {
removeNode()
val empty = waitFor(nodesPresent <= 1, timeout, sleep)
removeReady()
if (!empty) expire("exit")
}
def nodesPresent = barrierDir.list.size
def createNode() = nodeFile.createNewFile()
def removeNode() = nodeFile.delete()
def createReady() = readyFile.createNewFile()
def removeReady() = readyFile.delete()
def waitFor(test: Boolean, timeout: Duration, sleep: Duration): Boolean = {
val start = now
val limit = start + timeout.toMillis
var passed = test
var expired = false
while (!passed && !expired) {
if (now > limit) expired = true
else {
Thread.sleep(sleep.toMillis)
passed = test
}
}
passed
}
def expire(barrier: String) = {
throw new BarrierTimeoutException("Timeout (%s) waiting for %s barrier" format (timeout, barrier))
}
}

View file

@ -1,49 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.testkit.AkkaSpec
import akka.util.Duration
trait MultiJvmSync extends AkkaSpec {
def nodes: Int
override def atStartup() = {
onStart()
MultiJvmSync.start(getClass.getName, nodes)
}
def onStart() {}
override def atTermination() = {
MultiJvmSync.end(getClass.getName, nodes)
onEnd()
}
def onEnd() {}
def barrier(name: String, timeout: Duration = FileBasedBarrier.DefaultTimeout) = {
MultiJvmSync.barrier(name, nodes, getClass.getName, timeout)
}
}
object MultiJvmSync {
val TestMarker = "MultiJvm"
val StartBarrier = "multi-jvm-start"
val EndBarrier = "multi-jvm-end"
def start(className: String, count: Int) = barrier(StartBarrier, count, className)
def end(className: String, count: Int) = barrier(EndBarrier, count, className)
def barrier(name: String, count: Int, className: String, timeout: Duration = FileBasedBarrier.DefaultTimeout) = {
val Array(testName, nodeName) = className split TestMarker
val barrier = if (AkkaRemoteSpec.testNodes eq null)
new FileBasedBarrier(name, count, testName, nodeName, timeout)
else
new ZkClient.ZkBarrier(nodeName, count, "/" + testName + "_" + name)
barrier.await()
}
}

View file

@ -1,65 +0,0 @@
package akka.remote
import akka.actor.{ Actor, ActorRef, Props }
import akka.testkit._
import akka.dispatch.Await
import akka.pattern.ask
object NewRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec {
override def NrOfNodes = 2
class SomeActor extends Actor with Serializable {
def receive = {
case "identify" sender ! self
}
}
import com.typesafe.config.ConfigFactory
override def commonConfig = ConfigFactory.parseString("""
akka {
loglevel = "WARNING"
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/service-hello.remote = %s
}
}
}""" format akkaURIs(1))
}
class NewRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(NewRemoteActorMultiJvmSpec.nodeConfigs(0)) {
import NewRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("done")
}
}
}
class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(NewRemoteActorMultiJvmSpec.nodeConfigs(1)) with DefaultTimeout {
import NewRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"A new remote actor" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
barrier("start")
val actor = system.actorOf(Props[SomeActor], "service-hello")
Await.result(actor ? "identify", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort must equal(akkaSpec(0))
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
barrier("done")
}
}
}

View file

@ -1,110 +0,0 @@
package akka.remote
import akka.actor.{ Actor, ActorRef, Props }
import akka.routing._
import akka.testkit.DefaultTimeout
import akka.dispatch.Await
import akka.pattern.ask
object RandomRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec {
override def NrOfNodes = 4
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" sender ! self
case "end" context.stop(self)
}
}
import com.typesafe.config.ConfigFactory
override def commonConfig = ConfigFactory.parseString("""
akka {
loglevel = "WARNING"
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/service-hello.router = "random"
/service-hello.nr-of-instances = %d
/service-hello.target.nodes = [%s]
}
}
}""" format (3, akkaURIs(3)))
}
class RandomRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(0)) {
import RandomRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("broadcast-end")
barrier("end")
barrier("done")
}
}
}
class RandomRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(1)) {
import RandomRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("broadcast-end")
barrier("end")
barrier("done")
}
}
}
class RandomRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(2)) {
import RandomRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("broadcast-end")
barrier("end")
barrier("done")
}
}
}
class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RandomRoutedRemoteActorMultiJvmSpec.nodeConfigs(3)) with DefaultTimeout {
import RandomRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"A new remote actor configured with a Random router" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
barrier("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RandomRouter()), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true)
val connectionCount = NrOfNodes - 1
val iterationCount = 10
var replies = Map(
akkaSpec(0) -> 0,
akkaSpec(1) -> 0,
akkaSpec(2) -> 0)
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
val nodeName = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort
replies = replies + (nodeName -> (replies(nodeName) + 1))
}
}
barrier("broadcast-end")
actor ! Broadcast("end")
barrier("end")
replies.values foreach { _ must be > (0) }
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
barrier("done")
}
}
}

View file

@ -1,112 +0,0 @@
package akka.remote
import akka.actor.{ Actor, ActorRef, Props }
import akka.routing._
import akka.testkit.DefaultTimeout
import akka.dispatch.Await
import akka.pattern.ask
object RoundRobinRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec {
override def NrOfNodes = 4
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" sender ! self
case "end" context.stop(self)
}
}
import com.typesafe.config.ConfigFactory
override def commonConfig = ConfigFactory.parseString("""
akka {
loglevel = "WARNING"
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/service-hello.router = "round-robin"
/service-hello.nr-of-instances = %d
/service-hello.target.nodes = [%s]
}
}
}""" format (3, akkaURIs(3)))
}
class RoundRobinRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(0)) {
import RoundRobinRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("broadcast-end")
barrier("end")
barrier("done")
}
}
}
class RoundRobinRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(1)) {
import RoundRobinRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("broadcast-end")
barrier("end")
barrier("done")
}
}
}
class RoundRobinRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(2)) {
import RoundRobinRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("broadcast-end")
barrier("end")
barrier("done")
}
}
}
class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RoundRobinRoutedRemoteActorMultiJvmSpec.nodeConfigs(3)) with DefaultTimeout {
import RoundRobinRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"A new remote actor configured with a RoundRobin router" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
barrier("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true)
val connectionCount = NrOfNodes - 1
val iterationCount = 10
var replies = Map(
akkaSpec(0) -> 0,
akkaSpec(1) -> 0,
akkaSpec(2) -> 0)
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
val nodeName = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort
replies = replies + (nodeName -> (replies(nodeName) + 1))
}
}
barrier("broadcast-end")
actor ! Broadcast("end")
barrier("end")
replies.values foreach { _ must be(10) }
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
barrier("done")
}
}
}

View file

@ -1,107 +0,0 @@
package akka.remote
import akka.actor.{ Actor, ActorRef, Props }
import akka.routing._
import akka.testkit._
import akka.util.duration._
object ScatterGatherRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec {
override def NrOfNodes = 4
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" sender ! self
case "end" context.stop(self)
}
}
import com.typesafe.config.ConfigFactory
override def commonConfig = ConfigFactory.parseString("""
akka {
loglevel = "WARNING"
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/service-hello.router = "scatter-gather"
/service-hello.nr-of-instances = %d
/service-hello.target.nodes = [%s]
}
}
}""" format (3, akkaURIs(3)))
}
class ScatterGatherRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(0)) {
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("broadcast-end")
barrier("end")
barrier("done")
}
}
}
class ScatterGatherRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(1)) {
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("broadcast-end")
barrier("end")
barrier("done")
}
}
}
class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(2)) {
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"___" must {
"___" in {
barrier("start")
barrier("broadcast-end")
barrier("end")
barrier("done")
}
}
}
class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec.nodeConfigs(3))
with DefaultTimeout with ImplicitSender {
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"A new remote actor configured with a ScatterGather router" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
barrier("start")
val actor = system.actorOf(Props[SomeActor].withRouter(ScatterGatherFirstCompletedRouter(within = 10 seconds)), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true)
val connectionCount = NrOfNodes - 1
val iterationCount = 10
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
actor ! "hit"
}
}
val replies = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
case ref: ActorRef (ref.path.address.hostPort, 1)
}).foldLeft(Map(akkaSpec(0) -> 0, akkaSpec(1) -> 0, akkaSpec(2) -> 0)) {
case (m, (n, c)) m + (n -> (m(n) + c))
}
barrier("broadcast-end")
actor ! Broadcast("end")
barrier("end")
replies.values.sum must be === connectionCount * iterationCount
barrier("done")
}
}
}

View file

@ -1,71 +0,0 @@
/**
* Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
*/
package akka.remote
import org.apache.zookeeper._
import ZooDefs.Ids
object ZkClient extends Watcher {
// Don't forget to close!
lazy val zk: ZooKeeper = {
val remoteNodes = AkkaRemoteSpec.testNodes split ','
// ZkServers are configured to listen on a specific port.
val connectString = remoteNodes map (_+":2181") mkString ","
new ZooKeeper(connectString, 3000, this)
}
def process(ev: WatchedEvent) {
synchronized { notify() }
}
class ZkBarrier(name: String, count: Int, root: String) extends Barrier {
@annotation.tailrec
private def waitForServer() {
// SI-1672
val r = try {
zk.exists("/", false)
true
} catch {
case _: KeeperException.ConnectionLossException =>
Thread.sleep(10000)
false
}
if (!r) waitForServer()
}
waitForServer()
try zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) catch {
case _: KeeperException.NodeExistsException =>
}
val timeoutMs = 300*1000
private def block(num: Int) {
val start = System.currentTimeMillis
while (true) {
if (System.currentTimeMillis - start > timeoutMs) throw new InterruptedException("Timed out blocking in zk")
ZkClient.this.synchronized {
val children = zk.getChildren(root, true)
if (children.size < num) {
ZkClient.this.wait(timeoutMs)
} else
return
}
}
}
def enter() {
zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
block(count)
}
final def leave() {
zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
block(2*count)
}
}
def barrier(name: String, count: Int, root: String) = new ZkBarrier(name, count, root)
}

View file

@ -82,23 +82,17 @@ object AkkaBuild extends Build {
id = "akka-remote",
base = file("akka-remote"),
dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"),
settings = defaultSettings ++ multiJvmSettings ++ OSGi.remote ++ Seq(
settings = defaultSettings ++ OSGi.remote ++ Seq(
libraryDependencies ++= Dependencies.remote,
// disable parallel tests
parallelExecution in Test := false,
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
},
scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions,
jvmOptions in MultiJvm := defaultMultiJvmOptions,
test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x }
parallelExecution in Test := false
)
) configs (MultiJvm)
)
lazy val remoteTests = Project(
id = "akka-remote-tests",
base = file("akka-remote-tests"),
dependencies = Seq(remote % "compile;test->test;multi-jvm->multi-jvm", actorTests % "test->test", testkit % "test->test"),
dependencies = Seq(remote, actorTests % "test->test", testkit % "test->test"),
settings = defaultSettings ++ multiJvmSettings ++ Seq(
// disable parallel tests
parallelExecution in Test := false,
@ -415,8 +409,7 @@ object Dependencies {
)
val remote = Seq(
netty, protobuf, Test.junit, Test.scalatest,
Test.zookeeper, Test.log4j // needed for ZkBarrier in multi-jvm tests
netty, protobuf, Test.junit, Test.scalatest
)
val cluster = Seq(Test.junit, Test.scalatest)
@ -482,8 +475,6 @@ object Dependency {
val scalatest = "org.scalatest" % "scalatest_2.9.1" % V.Scalatest % "test" // ApacheV2
val scalacheck = "org.scala-tools.testing" % "scalacheck_2.9.1" % "1.9" % "test" // New BSD
val specs2 = "org.specs2" % "specs2_2.9.1" % "1.9" % "test" // Modified BSD / ApacheV2
val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % "3.4.0" % "test" // ApacheV2
val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2
}
}