diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index 0fe26f921c..3ca8fe5f58 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -240,6 +240,13 @@ trait ClusterNode { */ def store(actorRef: ActorRef, serializer: Serializer): ClusterNode + /** + * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated + * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly + * available durable store. + */ + def store(actorRef: ActorRef, serializeMailbox: Boolean, serializer: Serializer): ClusterNode + /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly @@ -261,13 +268,6 @@ trait ClusterNode { */ def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode - /** - * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store(actorRef: ActorRef, serializeMailbox: Boolean, serializer: Serializer): ClusterNode - /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index f6a474e0b4..4f26881d56 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -532,6 +532,14 @@ class DefaultClusterNode private[akka] ( def store(actorRef: ActorRef, serializer: Serializer): ClusterNode = store(actorRef, 0, Transient, false, serializer) + /** + * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated + * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly + * available durable store. + */ + def store(actorRef: ActorRef, serializeMailbox: Boolean, serializer: Serializer): ClusterNode = + store(actorRef, 0, Transient, serializeMailbox, serializer) + /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly @@ -556,14 +564,6 @@ class DefaultClusterNode private[akka] ( def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode = store(actorRef, replicationFactor, replicationScheme, false, serializer) - /** - * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store(actorRef: ActorRef, serializeMailbox: Boolean, serializer: Serializer): ClusterNode = - store(actorRef, 0, Transient, serializeMailbox, serializer) - /** * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 6ed9deb3db..9699be171f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -42,67 +42,6 @@ class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with "A ClusterNode" should { - "be able to cluster an actor by ActorRef" in { - // create actor - val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start - - val node = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "cluster-actor-1", port = 9001)) - node.start - - // register actor - import BinaryFormatMyJavaSerializableActor._ - var serializeMailbox = true - node.store(actorRef, serializeMailbox) - - node.isClustered(actorRef.address) must be(true) - node.uuidsForClusteredActors.exists(_ == actorRef.uuid) must be(true) - - node.stop - } - - "be able to remove an actor by actor uuid" in { - // create actor - val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start - - val node = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "remove-actor-uuid", port = 9001)) - node.start - - // register actor - import BinaryFormatMyJavaSerializableActor._ - var serializeMailbox = true - node.store(actorRef, serializeMailbox) - - node.uuidsForClusteredActors.exists(_ == actorRef.uuid) must be(true) - - // deregister actor - node.remove(actorRef.uuid) - node.uuidsForClusteredActors.exists(_ == actorRef.uuid) must be(false) - - node.stop - } - - "be able to remove an actor by actor address" in { - // create actor - val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start - - val node = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "remove-actor-id", port = 9001)) - node.start - - // register actor - import BinaryFormatMyJavaSerializableActor._ - var serializeMailbox = true - node.store(actorRef, serializeMailbox) - - node.isClustered(actorRef.address) must be(true) - node.addressesForClusteredActors.exists(_ == actorRef.address) must be(true) - - // deregister actor - node.remove(actorRef.address) - node.addressesForClusteredActors.exists(_ == actorRef.address) must be(false) - - node.stop - } - "be able to use an actor by actor address" in { val node = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "use-actor-id", port = 9001)) node.start diff --git a/akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmSpec.scala index 6a29ea6f09..b4c19b99d2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmSpec.scala @@ -21,12 +21,23 @@ import java.util.concurrent._ object RegistryStoreMultiJvmSpec { var NrOfNodes = 2 - class HelloWorld extends Actor with Serializable { + class HelloWorld1 extends Actor with Serializable { def receive = { case "Hello" ⇒ self.reply("World from node [" + Config.nodename + "]") } } + + class HelloWorld2 extends Actor with Serializable { + var counter = 0 + def receive = { + case "Hello" ⇒ + Thread.sleep(1000) + counter += 1 + case "Count" ⇒ + self.reply(counter) + } + } } class RegistryStoreMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { @@ -43,12 +54,34 @@ class RegistryStoreMultiJvmNode1 extends WordSpec with MustMatchers with BeforeA barrier("start-node-2", NrOfNodes) { } - barrier("store-in-node-1", NrOfNodes) { - val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x ⇒ fail("No serializer found"), s ⇒ s) - node.store(actorOf[HelloWorld]("hello-world-1"), serializer) + barrier("store-1-in-node-1", NrOfNodes) { + val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x ⇒ fail("No serializer found"), s ⇒ s) + node.store(actorOf[HelloWorld1]("hello-world-1"), serializer) } - barrier("use-in-node-2", NrOfNodes) { + barrier("use-1-in-node-2", NrOfNodes) { + } + + barrier("store-2-in-node-1", NrOfNodes) { + val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x ⇒ fail("No serializer found"), s ⇒ s) + node.store("hello-world-2", classOf[HelloWorld1], false, serializer) + } + + barrier("use-2-in-node-2", NrOfNodes) { + } + + barrier("store-3-in-node-1", NrOfNodes) { + val serializer = Serialization.serializerFor(classOf[HelloWorld2]).fold(x ⇒ fail("No serializer found"), s ⇒ s) + val actor = actorOf[HelloWorld2]("hello-world-3").start + actor ! "Hello" + actor ! "Hello" + actor ! "Hello" + actor ! "Hello" + actor ! "Hello" + node.store(actor, true, serializer) + } + + barrier("use-3-in-node-2", NrOfNodes) { } node.shutdown() @@ -78,14 +111,43 @@ class RegistryStoreMultiJvmNode2 extends WordSpec with MustMatchers { node.start() } - barrier("store-in-node-1", NrOfNodes) { + barrier("store-1-in-node-1", NrOfNodes) { } - barrier("use-in-node-2", NrOfNodes) { + barrier("use-1-in-node-2", NrOfNodes) { val actorOrOption = node.use("hello-world-1") if (actorOrOption.isEmpty) fail("Actor could not be retrieved") + val actorRef = actorOrOption.get actorRef.address must be("hello-world-1") + + (actorRef ? "Hello").as[String].get must be("World from node [node2]") + } + + barrier("store-2-in-node-1", NrOfNodes) { + } + + barrier("use-2-in-node-2", NrOfNodes) { + val actorOrOption = node.use("hello-world-2") + if (actorOrOption.isEmpty) fail("Actor could not be retrieved") + + val actorRef = actorOrOption.get + actorRef.address must be("hello-world-2") + + (actorRef ? "Hello").as[String].get must be("World from node [node2]") + } + + barrier("store-3-in-node-1", NrOfNodes) { + } + + barrier("use-3-in-node-2", NrOfNodes) { + val actorOrOption = node.use("hello-world-3") + if (actorOrOption.isEmpty) fail("Actor could not be retrieved") + + val actorRef = actorOrOption.get + actorRef.address must be("hello-world-3") + + (actorRef ? "Count").as[Int].get must be(4) } node.shutdown()