This commit is contained in:
Peter Veentjer 2011-08-02 09:52:39 +03:00
parent 02aeec6b57
commit 320ee3cb4c
75 changed files with 755 additions and 395 deletions

View file

@ -108,6 +108,9 @@ class InvalidMessageException private[akka] (message: String, cause: Throwable =
* This message is thrown by default when an Actors behavior doesn't match a message
*/
case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exception {
def this(msg: String) = this(msg, null)
// constructor with 'null' ActorRef needed to work with client instantiation of remote exception
override def getMessage =
if (ref ne null) "Actor %s does not handle [%s]".format(ref, msg)

View file

@ -1430,6 +1430,7 @@ class DefaultClusterNode private[akka] (
}
private[cluster] def failOverClusterActorRefConnections(from: InetSocketAddress, to: InetSocketAddress) {
EventHandler.info(this, "failOverClusterActorRef from %s to %s".format(from, to))
clusterActorRefs.values(from) foreach (_.failOver(from, to))
}

View file

@ -15,6 +15,7 @@ import com.eaio.uuid.UUID
import collection.immutable.Map
import annotation.tailrec
import akka.routing.Router
import akka.event.EventHandler
/**
* ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor
@ -57,6 +58,8 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
}
private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = {
EventHandler.info(this, "ClusterActorRef. %s failover from %s to %s".format(address, from, to))
@tailrec
def doFailover(from: InetSocketAddress, to: InetSocketAddress): Unit = {
val oldValue = inetSocketAddressToActorRefMap.get
@ -93,6 +96,8 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
}
def signalDeadActor(ref: ActorRef): Unit = {
EventHandler.info(this, "ClusterActorRef %s signalDeadActor %s".format(address, ref.address))
//since the number remote actor refs for a clustered actor ref is quite low, we can deal with the O(N) complexity
//of the following removal.
val map = inetSocketAddressToActorRefMap.get

View file

@ -1,42 +0,0 @@
package akka.cluster.routing.direct.bad_address
import akka.cluster.{ Cluster, MasterClusterTestNode }
import akka.actor.Actor
import akka.config.Config
object BadAddressDirectRoutingMultiJvmSpec {
val NrOfNodes = 1
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command: " + Config.nodename)
self.reply(Config.nodename)
}
}
}
}
class BadAddressDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
import BadAddressDirectRoutingMultiJvmSpec._
val testNodes = NrOfNodes
"node" must {
"participate in cluster" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}

View file

@ -1,3 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.home = "node:node2"

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.home = "node:node2"

View file

@ -0,0 +1,77 @@
package akka.cluster.routing.direct.failover
import akka.config.Config
import scala.Predef._
import akka.cluster.{ ClusterActorRef, Cluster, MasterClusterTestNode, ClusterTestNode }
import akka.actor.{ ActorInitializationException, Actor }
object FailoverDirectRoutingMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify"
println("The node received the 'identify' command: " + Config.nodename)
self.reply(Config.nodename)
case "die"
println("The node received the 'die' command: " + Config.nodename)
Cluster.node.shutdown
}
}
}
class FailoverDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
import FailoverDirectRoutingMultiJvmSpec._
val testNodes = NrOfNodes
"Direct Router" must {
"not yet be able to failover to another node" in {
println("==================================================================================================")
println(" FAILOVER DIRECT ROUTING")
println("==================================================================================================")
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
val actor = Actor.actorOf[SomeActor]("service-hello").start().asInstanceOf[ClusterActorRef]
println("retrieved identity was: " + (actor ? "identify").get)
(actor ? "identify").get must equal("node2")
actor ! "die"
Thread.sleep(4000)
try {
actor ! "identify"
fail()
} catch {
case e: ActorInitializationException
}
}
}
}
class FailoverDirectRoutingMultiJvmNode2 extends ClusterTestNode {
import FailoverDirectRoutingMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
}
}
}

View file

@ -0,0 +1,59 @@
package akka.cluster.routing.direct.homenode
import akka.config.Config
import akka.actor.Actor
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
import Cluster._
object HomeNodeMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
def receive = {
case "identify" {
self.reply(Config.nodename)
}
}
}
}
class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
import HomeNodeMultiJvmSpec._
val testNodes = NrOfNodes
"___" must {
"___" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
}
}
class HomeNodeMultiJvmNode2 extends ClusterTestNode {
import HomeNodeMultiJvmSpec._
"Direct Router: A Direct Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
val actorNode1 = Actor.actorOf[SomeActor]("service-node1").start()
val name1 = (actorNode1 ? "identify").get.asInstanceOf[String]
name1 must equal("node1")
val actorNode2 = Actor.actorOf[SomeActor]("service-node2").start()
val name2 = (actorNode2 ? "identify").get.asInstanceOf[String]
name2 must equal("node2")
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
}
}

View file

@ -0,0 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-node1.router = "direct"
akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-node2.router = "direct"
akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]

View file

@ -0,0 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-node1.router = "direct"
akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-node2.router = "direct"
akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]

View file

@ -1,3 +0,0 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"

View file

@ -1,66 +0,0 @@
package akka.cluster.routing.direct.multiple_replicas
import akka.actor.Actor
import akka.cluster.{ MasterClusterTestNode, Cluster, ClusterTestNode }
import akka.config.Config
object MultiReplicaDirectRoutingMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command: " + Config.nodename)
self.reply(Config.nodename)
}
}
}
}
class MultiReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
import MultiReplicaDirectRoutingMultiJvmSpec._
"when node send message to existing node using direct routing it" must {
"communicate with that node" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
//Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
val actor = Actor.actorOf[SomeActor]("service-hello")
actor.start()
//actor.start()
val name: String = (actor ? "identify").get.asInstanceOf[String]
println("The name of the actor was " + name)
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}
class MultiReplicaDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
import MultiReplicaDirectRoutingMultiJvmSpec._
val testNodes = NrOfNodes
"node" must {
"participate in cluster" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]

View file

@ -1,8 +1,8 @@
package akka.cluster.routing.direct.single_replica
package akka.cluster.routing.direct.normalusage
import akka.actor.Actor
import akka.config.Config
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
import akka.cluster.{ ClusterActorRef, ClusterTestNode, MasterClusterTestNode, Cluster }
object SingleReplicaDirectRoutingMultiJvmSpec {
val NrOfNodes = 2
@ -19,7 +19,6 @@ object SingleReplicaDirectRoutingMultiJvmSpec {
}
}
}
}
class SingleReplicaDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
@ -28,24 +27,6 @@ class SingleReplicaDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
val testNodes = NrOfNodes
"when node send message to existing node using direct routing it" must {
"communicate with that node" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
val actor = Actor.actorOf[SomeActor]("service-hello").start()
actor.isRunning must be(true)
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}
class SingleReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
import SingleReplicaDirectRoutingMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
@ -57,3 +38,24 @@ class SingleReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
}
}
class SingleReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
import SingleReplicaDirectRoutingMultiJvmSpec._
"Direct Router: when node send message to existing node it" must {
"communicate with that node" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
val actor = Actor.actorOf[SomeActor]("service-hello").start().asInstanceOf[ClusterActorRef]
actor.isRunning must be(true)
val result = (actor ? "identify").get
result must equal("node1")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}

View file

@ -1,3 +0,0 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -0,0 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -0,0 +1,119 @@
package akka.cluster.routing.random.failover
import akka.config.Config
import akka.cluster._
import akka.actor.{ ActorRef, Actor }
import java.util.{ Collections, Set JSet }
object RandomFailoverMultiJvmSpec {
val NrOfNodes = 3
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command")
self.reply(Config.nodename)
}
case "shutdown" {
println("The node received the 'shutdown' command")
Cluster.node.shutdown()
}
}
}
}
class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode {
import RandomFailoverMultiJvmSpec._
def testNodes = NrOfNodes
def sleepSome() {
println("Starting sleep")
Thread.sleep(1000) //nasty.. but ok for now.
println("Finished doing sleep")
}
"Random: when routing fails" must {
"jump to another replica" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
// ============= the real testing =================
val actor = Actor.actorOf[SomeActor]("service-hello").asInstanceOf[ClusterActorRef]
val oldFoundConnections = identifyConnections(actor)
println("---------------------------- oldFoundConnections ------------------------")
println(oldFoundConnections)
//since we have replication factor 2
oldFoundConnections.size() must be(2)
//terminate a node
actor ! "shutdown"
sleepSome()
//this is where the system behaves unpredictable. From time to time it works... from time to time there
//all kinds of connection timeouts. So this test shows that there are problems. For the time being
//the test code has been deactivated to prevent causing problems.
val newFoundConnections = identifyConnections(actor)
println("---------------------------- newFoundConnections ------------------------")
println(newFoundConnections)
//it still must be 2 since a different node should have been used to failover to
newFoundConnections.size() must be(2)
//they are not disjoint since, there must be a single element that is in both
Collections.disjoint(newFoundConnections, oldFoundConnections) must be(false)
//but they should not be equal since the shutdown-node has been replaced by another one.
newFoundConnections.equals(oldFoundConnections) must be(false)
Cluster.node.shutdown()
}
}
def identifyConnections(actor: ActorRef): JSet[String] = {
val set = new java.util.HashSet[String]
for (i 0 until NrOfNodes * 10) {
val value = (actor ? "identify").get.asInstanceOf[String]
set.add(value)
}
set
}
}
class RandomFailoverMultiJvmNode2 extends ClusterTestNode {
import RandomFailoverMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
}
}
}
class RandomFailoverMultiJvmNode3 extends ClusterTestNode {
import RandomFailoverMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
}
}
}

View file

@ -0,0 +1,8 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-node1.router = "random"
akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-node1.clustered.replication-factor = 1
akka.actor.deployment.service-node2.router = "random"
akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-node2.clustered.replication-factor = 1

View file

@ -0,0 +1,8 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-node1.router = "random"
akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-node1.clustered.replication-factor = 1
akka.actor.deployment.service-node2.router = "random"
akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-node2.clustered.replication-factor = 1

View file

@ -0,0 +1,59 @@
package akka.cluster.routing.random.homenode
import akka.config.Config
import akka.actor.Actor
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
import Cluster._
object HomeNodeMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
def receive = {
case "identify" {
self.reply(Config.nodename)
}
}
}
}
class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
import HomeNodeMultiJvmSpec._
val testNodes = NrOfNodes
"___" must {
"___" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
}
}
class HomeNodeMultiJvmNode2 extends ClusterTestNode {
import HomeNodeMultiJvmSpec._
"Random Router: A Random Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
val actorNode1 = Actor.actorOf[SomeActor]("service-node1")
val nameNode1 = (actorNode1 ? "identify").get.asInstanceOf[String]
nameNode1 must equal("node1")
val actorNode2 = Actor.actorOf[SomeActor]("service-node2")
val nameNode2 = (actorNode2 ? "identify").get.asInstanceOf[String]
nameNode2 must equal("node2")
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
}
}

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing.random.replicationfactor_1
import akka.cluster._
import akka.cluster.Cluster._
import akka.actor._
import akka.config.Config
/**
* Test that if a single node is used with a random router with replication factor then the actor is instantiated
* on the single node.
*/
object Random1ReplicaMultiJvmSpec {
class HelloWorld extends Actor with Serializable {
def receive = {
case "Hello"
self.reply("World from node [" + Config.nodename + "]")
}
}
}
class Random1ReplicaMultiJvmNode1 extends MasterClusterTestNode {
import Random1ReplicaMultiJvmSpec._
val testNodes = 1
"Random Router: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
node.start()
var hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)
hello.address must equal("service-hello")
hello.isInstanceOf[ClusterActorRef] must be(true)
hello must not equal (null)
val reply = (hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))
reply must equal("World from node [node1]")
node.shutdown()
}
}
}

View file

@ -1,5 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.replication-factor = 3

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.repliction-factor = 3

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.replication-factor = 3

View file

@ -0,0 +1,111 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing.random.replicationfactor_3
import akka.cluster._
import akka.actor._
import akka.config.Config
import Cluster._
/**
* When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible
* for running actors, or will it be just a 'client' talking to the cluster.
*/
object Random3ReplicasMultiJvmSpec {
val NrOfNodes = 3
class HelloWorld extends Actor with Serializable {
def receive = {
case "Hello"
self.reply("World from node [" + Config.nodename + "]")
}
}
}
/**
* What is the purpose of this node? Is this just a node for the cluster to make use of?
*/
class Random3ReplicasMultiJvmNode1 extends MasterClusterTestNode {
import Random3ReplicasMultiJvmSpec._
def testNodes: Int = NrOfNodes
"___" must {
"___" in {
node.start()
//wait till node 1 has started.
barrier("begin", NrOfNodes).await()
barrier("end", NrOfNodes).await()
node.shutdown()
}
}
}
class Random3ReplicasMultiJvmNode2 extends ClusterTestNode {
import Random3ReplicasMultiJvmSpec._
import Cluster._
"Random: A cluster" must {
"distribute requests randomly" in {
node.start()
//wait till node 1 has started.
barrier("begin", NrOfNodes).await()
//check if the actorRef is the expected remoteActorRef.
var hello: ActorRef = null
hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)
hello.address must equal("service-hello")
hello.isInstanceOf[ClusterActorRef] must be(true)
//todo: is there a reason to check for null again since it already has been done in the previous block.
hello must not equal (null)
val replies = collection.mutable.Map.empty[String, Int]
def count(reply: String) = {
if (replies.get(reply).isEmpty) replies.put(reply, 1)
else replies.put(reply, replies(reply) + 1)
}
for (i 0 until 1000) {
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from a node")))
}
assert(replies("World from node [node1]") > 100)
assert(replies("World from node [node2]") > 100)
assert(replies("World from node [node3]") > 100)
barrier("end", NrOfNodes).await()
node.shutdown()
}
}
}
class Random3ReplicasMultiJvmNode3 extends ClusterTestNode {
import Random3ReplicasMultiJvmSpec._
import Cluster._
"___" must {
"___" in {
node.start()
//wait till node 1 has started.
barrier("begin", NrOfNodes).await()
barrier("end", NrOfNodes).await()
node.shutdown()
}
}
}

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993

View file

@ -0,0 +1,120 @@
package akka.cluster.routing.roundrobin.failover
import akka.config.Config
import akka.cluster._
import akka.actor.{ ActorRef, Actor }
import java.util.{ Collections, Set JSet }
object RoundRobinFailoverMultiJvmSpec {
val NrOfNodes = 3
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command")
self.reply(Config.nodename)
}
case "shutdown" {
println("The node received the 'shutdown' command")
Cluster.node.shutdown()
}
}
}
}
class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
def testNodes = NrOfNodes
def sleepSome() {
println("Starting sleep")
Thread.sleep(1000) //nasty.. but ok for now.
println("Finished doing sleep")
}
"Round Robin: when round robin fails" must {
"jump to another replica" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
// ============= the real testing =================
val actor = Actor.actorOf[SomeActor]("service-hello").asInstanceOf[ClusterActorRef]
val oldFoundConnections = identifyConnections(actor)
println("---------------------------- oldFoundConnections ------------------------")
println(oldFoundConnections)
//since we have replication factor 2
oldFoundConnections.size() must be(2)
//terminate a node
actor ! "shutdown"
sleepSome()
//this is where the system behaves unpredictable. From time to time it works... from time to time there
//all kinds of connection timeouts. So this test shows that there are problems. For the time being
//the test code has been deactivated to prevent causing problems.
/*
val newFoundConnections = identifyConnections(actor)
println("---------------------------- newFoundConnections ------------------------")
println(newFoundConnections)
//it still must be 2 since a different node should have been used to failover to
newFoundConnections.size() must be(2)
//they are not disjoint since, there must be a single element that is in both
Collections.disjoint(newFoundConnections, oldFoundConnections) must be(false)
//but they should not be equal since the shutdown-node has been replaced by another one.
newFoundConnections.equals(oldFoundConnections) must be(false)
*/
Cluster.node.shutdown()
}
}
def identifyConnections(actor: ActorRef): JSet[String] = {
val set = new java.util.HashSet[String]
for (i 0 until NrOfNodes * 2) {
val value = (actor ? "identify").get.asInstanceOf[String]
set.add(value)
}
set
}
}
class RoundRobinFailoverMultiJvmNode2 extends ClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
}
}
}
class RoundRobinFailoverMultiJvmNode3 extends ClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
}
}
}

View file

@ -0,0 +1,8 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-node1.router = "round-robin"
akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-node1.clustered.replication-factor = 1
akka.actor.deployment.service-node2.router = "round-robin"
akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-node2.clustered.replication-factor = 1

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -1,7 +1,7 @@
package akka.cluster.routing.homenode
package akka.cluster.routing.roundrobin.homenode
import akka.config.Config
import akka.actor.{ ActorRef, Actor }
import akka.actor.Actor
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
import Cluster._
@ -16,6 +16,7 @@ object HomeNodeMultiJvmSpec {
}
}
}
}
class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
@ -24,13 +25,12 @@ class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
val testNodes = NrOfNodes
"A Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
"___" must {
"___" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
@ -41,18 +41,21 @@ class HomeNodeMultiJvmNode2 extends ClusterTestNode {
import HomeNodeMultiJvmSpec._
"A Router" must {
"Round Robin: A Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
barrier("get-ref-to-actor-on-node2", NrOfNodes) {
val actor = Actor.actorOf[SomeActor]("service-hello")
val name = (actor ? "identify").get.asInstanceOf[String]
name must equal("node1")
}
val actorNode1 = Actor.actorOf[SomeActor]("service-node1")
val name1 = (actorNode1 ? "identify").get.asInstanceOf[String]
name1 must equal("node1")
val actorNode2 = Actor.actorOf[SomeActor]("service-node2")
val name2 = (actorNode2 ? "identify").get.asInstanceOf[String]
name2 must equal("node2")
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
}

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing.roundrobin_1_replica
package akka.cluster.routing.roundrobin.replicationfactor_1
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@ -32,13 +32,12 @@ class RoundRobin1ReplicaMultiJvmNode1 extends MasterClusterTestNode {
val testNodes = 1
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
node.start()
var hello: ActorRef = null
hello = Actor.actorOf[HelloWorld]("service-hello")
var hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)
hello.address must equal("service-hello")
hello.isInstanceOf[ClusterActorRef] must be(true)

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing.roundrobin_2_replicas
package akka.cluster.routing.roundrobin.replicationfactor_2
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@ -39,7 +39,7 @@ object RoundRobin2ReplicasMultiJvmSpec {
class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import RoundRobin2ReplicasMultiJvmSpec._
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
System.getProperty("akka.cluster.nodename", "") must be("node1")
@ -51,16 +51,16 @@ class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with B
}
//wait till ndoe 2 has started.
barrier("start-node2", NrOfNodes) {}
barrier("start-node2", NrOfNodes).await()
//wait till node 3 has started.
barrier("start-node3", NrOfNodes) {}
barrier("start-node3", NrOfNodes).await()
//wait till an actor reference on node 2 has become available.
barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
//wait till the node 2 has send a message to the replica's.
barrier("send-message-from-node2-to-replicas", NrOfNodes) {}
barrier("send-message-from-node2-to-replicas", NrOfNodes).await()
node.shutdown()
}
@ -78,14 +78,14 @@ class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with B
class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
import RoundRobin2ReplicasMultiJvmSpec._
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
System.getProperty("akka.cluster.nodename", "") must be("node2")
System.getProperty("akka.cluster.port", "") must be("9992")
//wait till node 1 has started.
barrier("start-node1", NrOfNodes) {}
barrier("start-node1", NrOfNodes).await()
//wait till node 2 has started.
barrier("start-node2", NrOfNodes) {
@ -93,7 +93,7 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
}
//wait till node 3 has started.
barrier("start-node3", NrOfNodes) {}
barrier("start-node3", NrOfNodes).await()
//check if the actorRef is the expected remoteActorRef.
var hello: ActorRef = null

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing.roundrobin_3_replicas
package akka.cluster.routing.roundrobin.replicationfactor_3
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@ -35,7 +35,7 @@ object RoundRobin3ReplicasMultiJvmSpec {
class RoundRobin3ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import RoundRobin3ReplicasMultiJvmSpec._
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" ignore {
@ -45,16 +45,16 @@ class RoundRobin3ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with B
}
//wait till ndoe 2 has started.
barrier("start-node2", NrOfNodes) {}
barrier("start-node2", NrOfNodes).await()
//wait till node 3 has started.
barrier("start-node3", NrOfNodes) {}
barrier("start-node3", NrOfNodes).await()
//wait till an actor reference on node 2 has become available.
barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
//wait till the node 2 has send a message to the replica's.
barrier("send-message-from-node2-to-replicas", NrOfNodes) {}
barrier("send-message-from-node2-to-replicas", NrOfNodes).await()
node.shutdown()
}
@ -73,12 +73,12 @@ class RoundRobin3ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
import RoundRobin3ReplicasMultiJvmSpec._
import Cluster._
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" ignore {
//wait till node 1 has started.
barrier("start-node1", NrOfNodes) {}
barrier("start-node1", NrOfNodes).await()
//wait till node 2 has started.
barrier("start-node2", NrOfNodes) {
@ -86,7 +86,7 @@ class RoundRobin3ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
}
//wait till node 3 has started.
barrier("start-node3", NrOfNodes) {}
barrier("start-node3", NrOfNodes).await()
//check if the actorRef is the expected remoteActorRef.
var hello: ActorRef = null
@ -134,20 +134,20 @@ class RoundRobin3ReplicasMultiJvmNode3 extends WordSpec with MustMatchers {
import RoundRobin3ReplicasMultiJvmSpec._
import Cluster._
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" ignore {
barrier("start-node1", NrOfNodes) {}
barrier("start-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {}
barrier("start-node2", NrOfNodes).await()
barrier("start-node3", NrOfNodes) {
node.start()
}
barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
barrier("send-message-from-node2-to-replicas", NrOfNodes) {}
barrier("send-message-from-node2-to-replicas", NrOfNodes).await()
node.shutdown()
}

View file

@ -1 +0,0 @@
-Dakka.cluster.nodename=node4 -Dakka.cluster.port=9994

View file

@ -1,144 +0,0 @@
package akka.cluster.routing.roundrobin_failover
import akka.config.Config
import akka.cluster._
import akka.actor.{ ActorRef, Actor }
object RoundRobinFailoverMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command")
self.reply(Config.nodename)
}
case "shutdown" {
println("The node received the 'shutdown' command")
Cluster.node.shutdown()
}
}
}
}
class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
val testNodes = NrOfNodes
"foo" must {
"bla" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Getting reference to service-hello actor")
var hello: ActorRef = null
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {
hello = Actor.actorOf[SomeActor]("service-hello")
}
println("Successfully acquired reference")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}
class RoundRobinFailoverMultiJvmNode2 extends ClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
"foo" must {
"bla" in {
println("Started Zookeeper Node")
Cluster.node.start()
println("Waiting to begin")
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Begin!")
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
// ============= the real testing =================
/*
val actor = Actor.actorOf[SomeActor]("service-hello")
val firstTimeResult = (actor ? "identify").get
val secondTimeResult = (actor ? "identify").get
//since there are only 2 nodes, the identity should not have changed.
assert(firstTimeResult == secondTimeResult)
//if we now terminate the node that
actor ! "shutdown"
//todo: do some waiting
println("Doing some sleep")
try {
Thread.sleep(4000) //nasty.. but ok for now.
println("Finished doing sleep")
} finally {
println("Ended the Thread.sleep method somehow..")
}
//now we should get a different node that responds to us since there was a failover.
val thirdTimeResult = (actor ? "identify").get
assert(!(firstTimeResult == thirdTimeResult)) */
// ==================================================
println("Waiting to end")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
println("Shutting down ClusterNode")
Cluster.node.shutdown()
}
}
}
/*
class RoundRobinFailoverMultiJvmNode3 extends SlaveNode {
import RoundRobinFailoverMultiJvmSpec._
"foo" must {
"bla" in {
println("Started Zookeeper Node")
Cluster.node.start()
println("Waiting to begin")
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Begin!")
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
println("Waiting to end")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
println("Shutting down ClusterNode")
Cluster.node.shutdown()
}
}
}
class RoundRobinFailoverMultiJvmNode4 extends SlaveNode {
import RoundRobinFailoverMultiJvmSpec._
"foo" must {
"bla" in {
println("Started Zookeeper Node")
Cluster.node.start()
println("Waiting to begin")
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Begin!")
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
println("Waiting to end")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
println("Shutting down ClusterNode")
Cluster.node.shutdown()
}
}
} */

View file

@ -1,6 +0,0 @@
What does clustered home mean?
akka.actor.deployment.service-hello.clustered.home = "node:node1"
If a node fails, it should transparently be redeployed on a different node. So actors imho are homeless.. they run
wherever the grid deploys them.

View file

@ -1,54 +0,0 @@
- It would be nice if the .conf files somehow could be integrated in the scala file
object SomeNode extends ClusterNodeWithConf{
def config() = "
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1"
}
}
- It would be nice if the .opts file somehow could be integrated in the scala file.
object SomeNode extends ClusterNodeWithOpts{
def opts() = -Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
}
- It should be transparent which node starts/stops the cluster. Perhaps some kind of 'before the world starts' and
'after the world ended' logic could be added. The consequence is that there are mixed responsibilities in a node.
- A node has the mixed responsibity of being part of the grid and doing checks. It would be nice if one could create
cluster nodes very easily (just spawn a jvm and everything will be copied on them) and if one could create 'client nodes'
that communicate with the grid and do their validations.
- Each node has been expressed in code, so it is very hard to either use a large number of nodes (lots of code) of to change
the number of nodes without changes all the code. It would be nice if one could say: I want 100 jvm instances with this
specification.
- There is a lot of waiting for each other, but it would be nice if each node could say this:
waitForGo.
so you get something like:
object God{
def beforeBegin(){
ZooKeeper.start()
}
def afterEnd{
ZooKeeper.stop()
}
}
class SomeNode extends ClusterTestNode{
"foo" must {
"bla" in {
waitForGo()
..now do testing logic.
}
}
}