diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index de81074303..badad3da42 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -344,7 +344,7 @@ class FutureSpec extends JUnitSuite { val x = Future("Hello") val y = x map (_.length) - val r = flow(x() + " " + y.map(_ / 0).map(_.toString)(), 100) + val r = flow(x() + " " + y.map(_ / 0).map(_.toString)(), 200) intercept[java.lang.ArithmeticException](r.get) } @@ -358,7 +358,7 @@ class FutureSpec extends JUnitSuite { val x = Future(3) val y = (actor ? "Hello").mapTo[Int] - val r = flow(x() + y(), 100) + val r = flow(x() + y(), 200) intercept[ClassCastException](r.get) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index a29eea6798..6550a13a7e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -452,9 +452,6 @@ class LocalActorRef private[akka] ( case _ ⇒ true } - // FIXME how to get the matching serializerClassName? Now default is used. Needed for transaction log snapshot - // private val serializer = Actor.serializerFor(address, Format.defaultSerializerName) - def serializerErrorDueTo(reason: String) = throw new akka.config.ConfigurationException( "Could not create Serializer object [" + this.getClass.getName + diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index 83f712ae54..fba4a1e52a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -84,11 +84,18 @@ class ClusterActorRef private[akka] ( if (_status == ActorRefInternals.RUNNING) { _status = ActorRefInternals.SHUTDOWN postMessageToMailbox(RemoteActorSystemMessage.Stop, None) + + // FIXME here we need to fire off Actor.cluster.remove(address) (which needs to be properly implemented first, see ticket) + + inetSocketAddressToActorRefMap.get.values foreach (_.stop()) // shut down all remote connections } } } + // ======================================================================== // ==== NOT SUPPORTED ==== + // ======================================================================== + // FIXME move these methods and the same ones in RemoteActorRef to a base class - now duplicated def dispatcher_=(md: MessageDispatcher) { unsupported @@ -136,5 +143,5 @@ class ClusterActorRef private[akka] ( protected[akka] def actorInstance: AtomicReference[Actor] = unsupported - private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") + private def unsupported = throw new UnsupportedOperationException("Not supported for ClusterActorRef") } diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.conf index 762f32d92a..7d8a1476ad 100644 --- a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.conf +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.conf @@ -1,2 +1,4 @@ akka.enabled-modules = ["cluster"] akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.conf index 762f32d92a..7d8a1476ad 100644 --- a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.conf +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.conf @@ -1,2 +1,4 @@ akka.enabled-modules = ["cluster"] akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf index 762f32d92a..7d8a1476ad 100644 --- a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf @@ -1,2 +1,4 @@ akka.enabled-modules = ["cluster"] akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala index f5a39a33d6..82f240a9df 100644 --- a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala @@ -1,5 +1,5 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB +/* + * Copyright (C) 2009-2011 Scalable Solutions AB */ package akka.cluster.api.migration.automatic @@ -14,123 +14,116 @@ import Cluster._ import akka.config.Config import akka.serialization.Serialization -/** - * Tests automatic transparent migration of an actor from node1 to node2 and then from node2 to node3. - * - * object MigrationAutomaticMultiJvmSpec { - * var NrOfNodes = 3 - * - * class HelloWorld extends Actor with Serializable { - * def receive = { - * case "Hello" ⇒ - * self.reply("World from node [" + Config.nodename + "]") - * } - * } - * } - * - * class MigrationAutomaticMultiJvmNode1 extends ClusterTestNode { - * import MigrationAutomaticMultiJvmSpec._ - * - * "A cluster" must { - * - * "be able to migrate an actor from one node to another" in { - * - * barrier("start-node1", NrOfNodes) { - * node.start() - * } - * - * barrier("store-actor-in-node1", NrOfNodes) { - * val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x ⇒ fail("No serializer found"), s ⇒ s) - * node.store("hello-world", classOf[HelloWorld], 1, serializer) - * node.isInUseOnNode("hello-world") must be(true) - * val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) - * actorRef.address must be("hello-world") - * (actorRef ? "Hello").as[String].get must be("World from node [node1]") - * } - * - * barrier("start-node2", NrOfNodes) { - * } - * - * node.shutdown() - * } - * } - * } - * - * class MigrationAutomaticMultiJvmNode2 extends ClusterTestNode { - * import MigrationAutomaticMultiJvmSpec._ - * - * var isFirstReplicaNode = false - * - * "A cluster" must { - * - * "be able to migrate an actor from one node to another" in { - * - * barrier("start-node1", NrOfNodes) { - * } - * - * barrier("store-actor-in-node1", NrOfNodes) { - * } - * - * barrier("start-node2", NrOfNodes) { - * node.start() - * } - * - * Thread.sleep(2000) // wait for fail-over from node1 to node2 - * - * barrier("check-fail-over-to-node2", NrOfNodes - 1) { - * // both remaining nodes should now have the replica - * node.isInUseOnNode("hello-world") must be(true) - * val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) - * actorRef.address must be("hello-world") - * (actorRef ? "Hello").as[String].get must be("World from node [node2]") - * } - * - * barrier("start-node3", NrOfNodes - 1) { - * } - * - * node.shutdown() - * } - * } - * } - * - * class MigrationAutomaticMultiJvmNode3 extends MasterClusterTestNode { - * import MigrationAutomaticMultiJvmSpec._ - * - * val testNodes = NrOfNodes - * - * "A cluster" must { - * - * "be able to migrate an actor from one node to another" in { - * - * barrier("start-node1", NrOfNodes) { - * } - * - * barrier("store-actor-in-node1", NrOfNodes) { - * } - * - * barrier("start-node2", NrOfNodes) { - * } - * - * barrier("check-fail-over-to-node2", NrOfNodes - 1) { - * } - * - * barrier("start-node3", NrOfNodes - 1) { - * node.start() - * } - * - * Thread.sleep(2000) // wait for fail-over from node2 to node3 - * - * barrier("check-fail-over-to-node3", NrOfNodes - 2) { - * // both remaining nodes should now have the replica - * node.isInUseOnNode("hello-world") must be(true) - * val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) - * actorRef.address must be("hello-world") - * (actorRef ? "Hello").as[String].get must be("World from node [node3]") - * } - * - * node.shutdown() - * } - * } - * } - * - */ +object MigrationAutomaticMultiJvmSpec { + var NrOfNodes = 3 + + class HelloWorld extends Actor with Serializable { + def receive = { + case "Hello" ⇒ + self.reply("World from node [" + Config.nodename + "]") + } + } +} + +class MigrationAutomaticMultiJvmNode1 extends ClusterTestNode { + import MigrationAutomaticMultiJvmSpec._ + + "A cluster" must { + + "be able to migrate an actor from one node to another" in { + + barrier("start-node1", NrOfNodes) { + node.start() + } + + barrier("create-actor-on-node1", NrOfNodes) { + val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() + node.isInUseOnNode("hello-world") must be(true) + actorRef.address must be("hello-world") + (actorRef ? "Hello").as[String].get must be("World from node [node1]") + } + + barrier("start-node2", NrOfNodes) { + } + + node.shutdown() + } + } +} + +class MigrationAutomaticMultiJvmNode2 extends ClusterTestNode { + import MigrationAutomaticMultiJvmSpec._ + + var isFirstReplicaNode = false + + "A cluster" must { + + "be able to migrate an actor from one node to another" in { + + barrier("start-node1", NrOfNodes) { + } + + barrier("create-actor-on-node1", NrOfNodes) { + } + + barrier("start-node2", NrOfNodes) { + node.start() + } + + Thread.sleep(2000) // wait for fail-over from node1 to node2 + + barrier("check-fail-over-to-node2", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? "Hello").as[String].get must be("World from node [node2]") + } + + barrier("start-node3", NrOfNodes - 1) { + } + + node.shutdown() + } + } +} + +class MigrationAutomaticMultiJvmNode3 extends MasterClusterTestNode { + import MigrationAutomaticMultiJvmSpec._ + + val testNodes = NrOfNodes + + "A cluster" must { + + "be able to migrate an actor from one node to another" in { + + barrier("start-node1", NrOfNodes) { + } + + barrier("create-actor-on-node1", NrOfNodes) { + } + + barrier("start-node2", NrOfNodes) { + } + + barrier("check-fail-over-to-node2", NrOfNodes - 1) { + } + + barrier("start-node3", NrOfNodes - 1) { + node.start() + } + + Thread.sleep(2000) // wait for fail-over from node2 to node3 + + barrier("check-fail-over-to-node3", NrOfNodes - 2) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? "Hello").as[String].get must be("World from node [node3]") + } + + node.shutdown() + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala index 06e201497c..e715571a21 100644 --- a/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala @@ -17,7 +17,7 @@ import akka.config.Config import akka.serialization.Serialization import java.util.concurrent._ -/* + object MigrationExplicitMultiJvmSpec { var NrOfNodes = 2 @@ -108,4 +108,3 @@ class MigrationExplicitMultiJvmNode2 extends ClusterTestNode { } } } -*/ \ No newline at end of file