1. Fixed problems with actor fail-over migration.

2. Readded the tests for explicit and automatic migration
3. Fixed timeout issue in FutureSpec

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-07-05 18:44:10 +02:00
parent 2d4cb40557
commit 95dbd425c4
8 changed files with 132 additions and 130 deletions

View file

@ -344,7 +344,7 @@ class FutureSpec extends JUnitSuite {
val x = Future("Hello") val x = Future("Hello")
val y = x map (_.length) val y = x map (_.length)
val r = flow(x() + " " + y.map(_ / 0).map(_.toString)(), 100) val r = flow(x() + " " + y.map(_ / 0).map(_.toString)(), 200)
intercept[java.lang.ArithmeticException](r.get) intercept[java.lang.ArithmeticException](r.get)
} }
@ -358,7 +358,7 @@ class FutureSpec extends JUnitSuite {
val x = Future(3) val x = Future(3)
val y = (actor ? "Hello").mapTo[Int] val y = (actor ? "Hello").mapTo[Int]
val r = flow(x() + y(), 100) val r = flow(x() + y(), 200)
intercept[ClassCastException](r.get) intercept[ClassCastException](r.get)
} }

View file

@ -452,9 +452,6 @@ class LocalActorRef private[akka] (
case _ true case _ true
} }
// FIXME how to get the matching serializerClassName? Now default is used. Needed for transaction log snapshot
// private val serializer = Actor.serializerFor(address, Format.defaultSerializerName)
def serializerErrorDueTo(reason: String) = def serializerErrorDueTo(reason: String) =
throw new akka.config.ConfigurationException( throw new akka.config.ConfigurationException(
"Could not create Serializer object [" + this.getClass.getName + "Could not create Serializer object [" + this.getClass.getName +

View file

@ -84,11 +84,18 @@ class ClusterActorRef private[akka] (
if (_status == ActorRefInternals.RUNNING) { if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN _status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None) postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
// FIXME here we need to fire off Actor.cluster.remove(address) (which needs to be properly implemented first, see ticket)
inetSocketAddressToActorRefMap.get.values foreach (_.stop()) // shut down all remote connections
} }
} }
} }
// ========================================================================
// ==== NOT SUPPORTED ==== // ==== NOT SUPPORTED ====
// ========================================================================
// FIXME move these methods and the same ones in RemoteActorRef to a base class - now duplicated // FIXME move these methods and the same ones in RemoteActorRef to a base class - now duplicated
def dispatcher_=(md: MessageDispatcher) { def dispatcher_=(md: MessageDispatcher) {
unsupported unsupported
@ -136,5 +143,5 @@ class ClusterActorRef private[akka] (
protected[akka] def actorInstance: AtomicReference[Actor] = unsupported protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") private def unsupported = throw new UnsupportedOperationException("Not supported for ClusterActorRef")
} }

View file

@ -1,2 +1,4 @@
akka.enabled-modules = ["cluster"] akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG" akka.event-handler-level = "DEBUG"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,2 +1,4 @@
akka.enabled-modules = ["cluster"] akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG" akka.event-handler-level = "DEBUG"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,2 +1,4 @@
akka.enabled-modules = ["cluster"] akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG" akka.event-handler-level = "DEBUG"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
/** /*
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/ */
package akka.cluster.api.migration.automatic package akka.cluster.api.migration.automatic
@ -14,123 +14,116 @@ import Cluster._
import akka.config.Config import akka.config.Config
import akka.serialization.Serialization import akka.serialization.Serialization
/** object MigrationAutomaticMultiJvmSpec {
* Tests automatic transparent migration of an actor from node1 to node2 and then from node2 to node3. var NrOfNodes = 3
*
* object MigrationAutomaticMultiJvmSpec { class HelloWorld extends Actor with Serializable {
* var NrOfNodes = 3 def receive = {
* case "Hello"
* class HelloWorld extends Actor with Serializable { self.reply("World from node [" + Config.nodename + "]")
* def receive = { }
* case "Hello" }
* self.reply("World from node [" + Config.nodename + "]") }
* }
* } class MigrationAutomaticMultiJvmNode1 extends ClusterTestNode {
* } import MigrationAutomaticMultiJvmSpec._
*
* class MigrationAutomaticMultiJvmNode1 extends ClusterTestNode { "A cluster" must {
* import MigrationAutomaticMultiJvmSpec._
* "be able to migrate an actor from one node to another" in {
* "A cluster" must {
* barrier("start-node1", NrOfNodes) {
* "be able to migrate an actor from one node to another" in { node.start()
* }
* barrier("start-node1", NrOfNodes) {
* node.start() barrier("create-actor-on-node1", NrOfNodes) {
* } val actorRef = Actor.actorOf[HelloWorld]("hello-world").start()
* node.isInUseOnNode("hello-world") must be(true)
* barrier("store-actor-in-node1", NrOfNodes) { actorRef.address must be("hello-world")
* val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x fail("No serializer found"), s s) (actorRef ? "Hello").as[String].get must be("World from node [node1]")
* node.store("hello-world", classOf[HelloWorld], 1, serializer) }
* node.isInUseOnNode("hello-world") must be(true)
* val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) barrier("start-node2", NrOfNodes) {
* actorRef.address must be("hello-world") }
* (actorRef ? "Hello").as[String].get must be("World from node [node1]")
* } node.shutdown()
* }
* barrier("start-node2", NrOfNodes) { }
* } }
*
* node.shutdown() class MigrationAutomaticMultiJvmNode2 extends ClusterTestNode {
* } import MigrationAutomaticMultiJvmSpec._
* }
* } var isFirstReplicaNode = false
*
* class MigrationAutomaticMultiJvmNode2 extends ClusterTestNode { "A cluster" must {
* import MigrationAutomaticMultiJvmSpec._
* "be able to migrate an actor from one node to another" in {
* var isFirstReplicaNode = false
* barrier("start-node1", NrOfNodes) {
* "A cluster" must { }
*
* "be able to migrate an actor from one node to another" in { barrier("create-actor-on-node1", NrOfNodes) {
* }
* barrier("start-node1", NrOfNodes) {
* } barrier("start-node2", NrOfNodes) {
* node.start()
* barrier("store-actor-in-node1", NrOfNodes) { }
* }
* Thread.sleep(2000) // wait for fail-over from node1 to node2
* barrier("start-node2", NrOfNodes) {
* node.start() barrier("check-fail-over-to-node2", NrOfNodes - 1) {
* } // both remaining nodes should now have the replica
* node.isInUseOnNode("hello-world") must be(true)
* Thread.sleep(2000) // wait for fail-over from node1 to node2 val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
* actorRef.address must be("hello-world")
* barrier("check-fail-over-to-node2", NrOfNodes - 1) { (actorRef ? "Hello").as[String].get must be("World from node [node2]")
* // both remaining nodes should now have the replica }
* node.isInUseOnNode("hello-world") must be(true)
* val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) barrier("start-node3", NrOfNodes - 1) {
* actorRef.address must be("hello-world") }
* (actorRef ? "Hello").as[String].get must be("World from node [node2]")
* } node.shutdown()
* }
* barrier("start-node3", NrOfNodes - 1) { }
* } }
*
* node.shutdown() class MigrationAutomaticMultiJvmNode3 extends MasterClusterTestNode {
* } import MigrationAutomaticMultiJvmSpec._
* }
* } val testNodes = NrOfNodes
*
* class MigrationAutomaticMultiJvmNode3 extends MasterClusterTestNode { "A cluster" must {
* import MigrationAutomaticMultiJvmSpec._
* "be able to migrate an actor from one node to another" in {
* val testNodes = NrOfNodes
* barrier("start-node1", NrOfNodes) {
* "A cluster" must { }
*
* "be able to migrate an actor from one node to another" in { barrier("create-actor-on-node1", NrOfNodes) {
* }
* barrier("start-node1", NrOfNodes) {
* } barrier("start-node2", NrOfNodes) {
* }
* barrier("store-actor-in-node1", NrOfNodes) {
* } barrier("check-fail-over-to-node2", NrOfNodes - 1) {
* }
* barrier("start-node2", NrOfNodes) {
* } barrier("start-node3", NrOfNodes - 1) {
* node.start()
* barrier("check-fail-over-to-node2", NrOfNodes - 1) { }
* }
* Thread.sleep(2000) // wait for fail-over from node2 to node3
* barrier("start-node3", NrOfNodes - 1) {
* node.start() barrier("check-fail-over-to-node3", NrOfNodes - 2) {
* } // both remaining nodes should now have the replica
* node.isInUseOnNode("hello-world") must be(true)
* Thread.sleep(2000) // wait for fail-over from node2 to node3 val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
* actorRef.address must be("hello-world")
* barrier("check-fail-over-to-node3", NrOfNodes - 2) { (actorRef ? "Hello").as[String].get must be("World from node [node3]")
* // both remaining nodes should now have the replica }
* node.isInUseOnNode("hello-world") must be(true)
* val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) node.shutdown()
* actorRef.address must be("hello-world") }
* (actorRef ? "Hello").as[String].get must be("World from node [node3]") }
* } }
*
* node.shutdown()
* }
* }
* }
*
*/

View file

@ -17,7 +17,7 @@ import akka.config.Config
import akka.serialization.Serialization import akka.serialization.Serialization
import java.util.concurrent._ import java.util.concurrent._
/*
object MigrationExplicitMultiJvmSpec { object MigrationExplicitMultiJvmSpec {
var NrOfNodes = 2 var NrOfNodes = 2
@ -108,4 +108,3 @@ class MigrationExplicitMultiJvmNode2 extends ClusterTestNode {
} }
} }
} }
*/