diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index 0e4432159a..0934452c54 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -91,33 +91,22 @@ object ChangeListener { * * @author Jonas Bonér */ -class NodeAddress( - val clusterName: String, - val nodeName: String, - val hostname: String, - val port: Int) { - if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string") - if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string") +class NodeAddress(val clusterName: String, val nodeName: String) { if ((clusterName eq null) || clusterName == "") throw new NullPointerException("Cluster name must not be null or empty string") - if (port < 1) throw new NullPointerException("Port can not be negative") + if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string") - override def toString = "%s:%s:%s:%s".format(clusterName, nodeName, hostname, port) + override def toString = "%s:%s".format(clusterName, nodeName) - override def hashCode = 0 + clusterName.## + nodeName.## + hostname.## + port.## + override def hashCode = 0 + clusterName.## + nodeName.## override def equals(other: Any) = NodeAddress.unapply(this) == NodeAddress.unapply(other) } object NodeAddress { - - def apply( - clusterName: String = Config.clusterName, - nodeName: String = Config.nodename, - hostname: String = Config.hostname, - port: Int = Config.remoteServerPort): NodeAddress = new NodeAddress(clusterName, nodeName, hostname, port) + def apply(clusterName: String = Config.clusterName, nodeName: String = Config.nodename): NodeAddress = new NodeAddress(clusterName, nodeName) def unapply(other: Any) = other match { - case address: NodeAddress ⇒ Some((address.clusterName, address.nodeName, address.hostname, address.port)) + case address: NodeAddress ⇒ Some((address.clusterName, address.nodeName)) case _ ⇒ None } } @@ -483,12 +472,13 @@ trait ClusterNode { */ def removeConfigElement(key: String) + /** + * Returns a list with all config element keys. + */ def getConfigElementKeys: Array[String] private[cluster] def initializeNode() - private[cluster] def addressForNode(node: String): Option[InetSocketAddress] - private[cluster] def publish(change: ChangeNotification) private[cluster] def findFailedNodes(nodes: List[String]): List[String] @@ -501,15 +491,13 @@ trait ClusterNode { private[cluster] def findNewlyDisconnectedAvailableNodes(nodes: List[String]): List[String] - private[cluster] def joinMembershipPath() - - private[cluster] def joinActorsAtAddressPath() + private[cluster] def joinCluster() private[cluster] def joinLeaderElection: Boolean private[cluster] def failOverConnections(from: InetSocketAddress, to: InetSocketAddress) - private[cluster] def migrateFromFailedNodes[T <: Actor](currentSetOfClusterNodes: List[String]) + private[cluster] def automaticMigrationFromFailedNodes() private[cluster] def membershipPathFor(node: String): String @@ -534,5 +522,9 @@ trait ClusterNode { private[cluster] def actorRegistryNodePathFor(uuid: UUID): String private[cluster] def actorRegistryNodePathFor(uuid: UUID, address: InetSocketAddress): String + + private[cluster] def remoteSocketAddressForNode(node: String): InetSocketAddress + + private[cluster] def createActorsAtAddressPath() } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index d0e21c3dd4..c8778ac9d6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -164,14 +164,14 @@ object Cluster { /** * The node address. */ - val nodeAddress = NodeAddress(name, nodename, hostname, port) + val nodeAddress = NodeAddress(name, nodename) /** * The reference to the running ClusterNode. */ val node = { if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null") - new DefaultClusterNode(nodeAddress, zooKeeperServers, defaultSerializer) + new DefaultClusterNode(nodeAddress, hostname, port, zooKeeperServers, defaultSerializer) } /** @@ -273,13 +273,17 @@ object Cluster { */ class DefaultClusterNode private[akka] ( val nodeAddress: NodeAddress, + val hostname: String = Config.hostname, + val port: Int = Config.remoteServerPort, val zkServerAddresses: String, val serializer: ZkSerializer) extends ErrorHandler with ClusterNode { self ⇒ + if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string") + if (port < 1) throw new NullPointerException("Port can not be negative") if (nodeAddress eq null) throw new IllegalArgumentException("'nodeAddress' can not be 'null'") - val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster") + val clusterJmxObjectName = JMX.nameFor(hostname, "monitoring", "cluster") import Cluster._ @@ -303,7 +307,7 @@ class DefaultClusterNode private[akka] ( lazy val remoteService: RemoteSupport = { val remote = new akka.remote.netty.NettyRemoteSupport - remote.start(nodeAddress.hostname, nodeAddress.port) + remote.start(hostname, port) remote.register(RemoteClusterDaemon.ADDRESS, remoteDaemon) remote.addListener(remoteClientLifeCycleListener) remote @@ -336,7 +340,7 @@ class DefaultClusterNode private[akka] ( def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]] - private[akka] val replicaConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = + private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]] // zookeeper listeners @@ -388,7 +392,7 @@ class DefaultClusterNode private[akka] ( locallyCachedMembershipNodes.clear() locallyCheckedOutActors.clear() - replicaConnections.toList.foreach({ + nodeConnections.toList.foreach({ case (_, (address, _)) ⇒ Actor.remote.shutdownClientConnection(address) // shut down client connections }) @@ -401,7 +405,7 @@ class DefaultClusterNode private[akka] ( // for monitoring remote listener registry.local.actors.filter(remoteService.hasListener).foreach(_.stop()) - replicaConnections.clear() + nodeConnections.clear() disconnect() EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress)) @@ -670,23 +674,7 @@ class DefaultClusterNode private[akka] ( .setActorUuid(uuidToUuidProtocol(uuid)) .build - replicaConnectionsForReplicationFactor(replicationFactor) foreach { connection ⇒ - (connection ? (command, remoteDaemonAckTimeout)).as[Status] match { - - case Some(Success) ⇒ - EventHandler.debug(this, "Replica for [%s] successfully created".format(actorRef.address)) - - case Some(Failure(cause)) ⇒ - EventHandler.error(cause, this, cause.toString) - throw cause - - case None ⇒ - val error = new ClusterException( - "Operation to instantiate replicas throughout the cluster timed out") - EventHandler.error(error, this, error.toString) - throw error - } - } + nodeConnectionsForReplicationFactor(replicationFactor) foreach { connection ⇒ sendCommandToReplica(connection, command, async = false) } this } else throw new ClusterException("Not connected to cluster") @@ -804,30 +792,36 @@ class DefaultClusterNode private[akka] ( EventHandler.debug(this, "Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid)) + connectToAllNewlyArrivedMembershipNodesInCluster() + val command = RemoteDaemonMessageProtocol.newBuilder .setMessageType(USE) .setActorUuid(uuidToUuidProtocol(uuid)) .build membershipNodes foreach { node ⇒ - replicaConnections.get(node) foreach { - case (_, connection) ⇒ connection ! command + nodeConnections.get(node) foreach { + case (_, connection) ⇒ sendCommandToReplica(connection, command, async = false) } } } } /** - * Using (checking out) specific UUID on a specefic node. + * Using (checking out) specific UUID on a specific node. */ def useActorOnNode(node: String, uuid: UUID) { isConnected ifOn { - replicaConnections.get(node) foreach { + + connectToAllNewlyArrivedMembershipNodesInCluster() + + nodeConnections.get(node) foreach { case (_, connection) ⇒ - connection ! RemoteDaemonMessageProtocol.newBuilder + val command = RemoteDaemonMessageProtocol.newBuilder .setMessageType(USE) .setActorUuid(uuidToUuidProtocol(uuid)) .build + sendCommandToReplica(connection, command, async = false) } } } @@ -866,14 +860,17 @@ class DefaultClusterNode private[akka] ( isConnected ifOn { EventHandler.debug(this, "Releasing (checking in) all actors with UUID [%s] on all nodes in cluster".format(uuid)) + + connectToAllNewlyArrivedMembershipNodesInCluster() + val command = RemoteDaemonMessageProtocol.newBuilder .setMessageType(RELEASE) .setActorUuid(uuidToUuidProtocol(uuid)) .build + nodesForActorsInUseWithUuid(uuid) foreach { node ⇒ - replicaConnections.get(node) foreach { - case (_, connection) ⇒ - connection ! command + nodeConnections.get(node) foreach { + case (_, connection) ⇒ sendCommandToReplica(connection, command, async = true) } } } @@ -1084,7 +1081,7 @@ class DefaultClusterNode private[akka] ( .setMessageType(FUNCTION_FUN0_UNIT) .setPayload(ByteString.copyFrom(bytes)) .build - replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) + nodeConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) } } @@ -1100,7 +1097,7 @@ class DefaultClusterNode private[akka] ( .setMessageType(FUNCTION_FUN0_ANY) .setPayload(ByteString.copyFrom(bytes)) .build - val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message) + val results = nodeConnectionsForReplicationFactor(replicationFactor) map (_ ? message) results.toList.asInstanceOf[List[Future[Any]]] } } @@ -1117,7 +1114,7 @@ class DefaultClusterNode private[akka] ( .setMessageType(FUNCTION_FUN1_ARG_UNIT) .setPayload(ByteString.copyFrom(bytes)) .build - replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) + nodeConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message) } } @@ -1134,7 +1131,7 @@ class DefaultClusterNode private[akka] ( .setMessageType(FUNCTION_FUN1_ARG_ANY) .setPayload(ByteString.copyFrom(bytes)) .build - val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message) + val results = nodeConnectionsForReplicationFactor(replicationFactor) map (_ ? message) results.toList.asInstanceOf[List[Future[Any]]] } } @@ -1192,12 +1189,37 @@ class DefaultClusterNode private[akka] ( } } + /** + * Returns a list with all config element keys. + */ def getConfigElementKeys: Array[String] = zkClient.getChildren(CONFIGURATION_PATH).toList.toArray.asInstanceOf[Array[String]] // ======================================= // Private // ======================================= + private def sendCommandToReplica(connection: ActorRef, command: RemoteDaemonMessageProtocol, async: Boolean = true) { + if (async) { + connection ! (command, remoteDaemonAckTimeout) + } else { + (connection ? (command, remoteDaemonAckTimeout)).as[Status] match { + + case Some(Success) ⇒ + EventHandler.debug(this, "Replica for [%s] successfully created".format(connection.address)) + + case Some(Failure(cause)) ⇒ + EventHandler.error(cause, this, cause.toString) + throw cause + + case None ⇒ + val error = new ClusterException( + "Operation to instantiate replicas throughout the cluster timed out") + EventHandler.error(error, this, error.toString) + throw error + } + } + } + private[cluster] def membershipPathFor(node: String) = "%s/%s".format(MEMBERSHIP_PATH, node) private[cluster] def configurationPathFor(key: String) = "%s/%s".format(CONFIGURATION_PATH, key) @@ -1232,32 +1254,18 @@ class DefaultClusterNode private[akka] ( "\n\tport = [%s]" + "\n\tzookeeper server addresses = [%s]" + "\n\tserializer = [%s]") - .format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer)) + .format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer)) EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString)) createRootClusterNode() val isLeader = joinLeaderElection() if (isLeader) createNodeStructureIfNeeded() registerListeners() - joinMembershipPath() - joinActorsAtAddressPath() + joinCluster() + createActorsAtAddressPath() fetchMembershipNodes() EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress)) } - private[cluster] def addressForNode(node: String): Option[InetSocketAddress] = { - try { - val address = zkClient.readData(membershipPathFor(node)).asInstanceOf[String] - val tokenizer = new java.util.StringTokenizer(address, ":") - tokenizer.nextToken // cluster name - tokenizer.nextToken // node name - val hostname = tokenizer.nextToken // hostname - val port = tokenizer.nextToken.toInt // port - Some(new InetSocketAddress(hostname, port)) - } catch { - case e: ZkNoNodeException ⇒ None - } - } - private def actorUuidsForActorAddress(actorAddress: String): Array[UUID] = uuidsForActorAddress(actorAddress) filter (_ ne null) @@ -1265,14 +1273,14 @@ class DefaultClusterNode private[akka] ( * Returns a random set with replica connections of size 'replicationFactor'. * Default replicationFactor is 0, which returns the empty set. */ - private def replicaConnectionsForReplicationFactor(replicationFactor: Int = 0): Set[ActorRef] = { + private def nodeConnectionsForReplicationFactor(replicationFactor: Int = 0): Set[ActorRef] = { var replicas = HashSet.empty[ActorRef] if (replicationFactor < 1) return replicas - connectToAllMembershipNodesInCluster() + connectToAllNewlyArrivedMembershipNodesInCluster() - val numberOfReplicas = replicaConnections.size - val replicaConnectionsAsArray = replicaConnections.toList map { + val numberOfReplicas = nodeConnections.size + val nodeConnectionsAsArray = nodeConnections.toList map { case (node, (address, actorRef)) ⇒ actorRef } // the ActorRefs @@ -1281,12 +1289,12 @@ class DefaultClusterNode private[akka] ( "Replication factor [" + replicationFactor + "] is greater than the number of available nodes [" + numberOfReplicas + "]") } else if (numberOfReplicas == replicationFactor) { - replicas = replicas ++ replicaConnectionsAsArray + replicas = replicas ++ nodeConnectionsAsArray } else { val random = new java.util.Random(System.currentTimeMillis) while (replicas.size < replicationFactor) { val index = random.nextInt(numberOfReplicas) - replicas = replicas + replicaConnectionsAsArray(index) + replicas = replicas + nodeConnectionsAsArray(index) } } replicas @@ -1295,29 +1303,29 @@ class DefaultClusterNode private[akka] ( /** * Connect to all available replicas unless already connected). */ - private def connectToAllMembershipNodesInCluster() { + private def connectToAllNewlyArrivedMembershipNodesInCluster() { membershipNodes foreach { node ⇒ if ((node != Config.nodename)) { // no replica on the "home" node of the ref - if (!replicaConnections.contains(node)) { // only connect to each replica once - val addressOption = addressForNode(node) + if (!nodeConnections.contains(node)) { // only connect to each replica once + val addressOption = remoteSocketAddressForNode(node) if (addressOption.isDefined) { val address = addressOption.get EventHandler.debug(this, - "Connecting to replica with nodename [%s] and address [%s]".format(node, address)) + "Setting up connection to node with nodename [%s] and address [%s]".format(node, address)) val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort) - replicaConnections.put(node, (address, clusterDaemon)) + nodeConnections.put(node, (address, clusterDaemon)) } } } } } - private[cluster] def joinMembershipPath() { + private[cluster] def joinCluster() { nodeNameToAddress += (nodeAddress.nodeName -> remoteServerAddress) try { EventHandler.info(this, "Joining cluster as membership node [%s] on [%s]".format(nodeAddress, membershipNodePath)) - zkClient.createEphemeral(membershipNodePath, nodeAddress.toString) + zkClient.createEphemeral(membershipNodePath, remoteServerAddress) } catch { case e: ZkNodeExistsException ⇒ val error = new ClusterException("Can't join the cluster. The node name [" + nodeAddress.nodeName + "] is already in by another node") @@ -1326,10 +1334,6 @@ class DefaultClusterNode private[akka] ( } } - private[cluster] def joinActorsAtAddressPath() { - ignore[ZkNodeExistsException](zkClient.createPersistent(actorsAtNodePathFor(nodeAddress.nodeName))) - } - private[cluster] def joinLeaderElection(): Boolean = { EventHandler.info(this, "Node [%s] is joining leader election".format(nodeAddress.nodeName)) try { @@ -1339,12 +1343,22 @@ class DefaultClusterNode private[akka] ( } } + private[cluster] def remoteSocketAddressForNode(node: String): InetSocketAddress = { + zkClient.readData(membershipPathFor(node), new Stat).asInstanceOf[InetSocketAddress] + } + + private[cluster] def createActorsAtAddressPath() { + ignore[ZkNodeExistsException](zkClient.createPersistent(actorsAtNodePathFor(nodeAddress.nodeName))) + } + private[cluster] def failOverConnections(from: InetSocketAddress, to: InetSocketAddress) { clusterActorRefs.values(from) foreach (_.failOver(from, to)) } - private[cluster] def migrateFromFailedNodes[T <: Actor](currentSetOfClusterNodes: List[String]) = { - findFailedNodes(currentSetOfClusterNodes).foreach { failedNodeName ⇒ + // FIXME makes use of automaticMigrationFromFailedNodes method, why is it not used? + private[cluster] def automaticMigrationFromFailedNodes() { + connectToAllNewlyArrivedMembershipNodesInCluster() + findFailedNodes(membershipNodes.toList).foreach { failedNodeName ⇒ val allNodes = locallyCachedMembershipNodes.toList val myIndex = allNodes.indexWhere(_.endsWith(nodeAddress.nodeName)) @@ -1396,9 +1410,8 @@ class DefaultClusterNode private[akka] ( .setPayload(ByteString.copyFrom(bytes)) .build membershipNodes foreach { node ⇒ - replicaConnections.get(node) foreach { - case (_, connection) ⇒ - connection ! command + nodeConnections.get(node) foreach { + case (_, connection) ⇒ sendCommandToReplica(connection, command, async = true) } } } @@ -1412,6 +1425,8 @@ class DefaultClusterNode private[akka] ( private def migrateWithoutCheckingThatActorResidesOnItsHomeNode( from: NodeAddress, to: NodeAddress, actorAddress: String) { + EventHandler.debug(this, "Migrating actor [%s] from node [%s] to node [%s]".format(actorAddress, from, to)) + actorUuidsForActorAddress(actorAddress) map { uuid ⇒ val actorAddressOption = actorAddressForUuid(uuid) if (actorAddressOption.isDefined) { @@ -1420,9 +1435,8 @@ class DefaultClusterNode private[akka] ( if (!isInUseOnNode(actorAddress, to)) { release(actorAddress) - val newAddress = new InetSocketAddress(to.hostname, to.port) ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid))) - ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, newAddress))) + ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, remoteSocketAddressForNode(to.nodeName)))) ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to))) ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid))) @@ -1505,9 +1519,9 @@ class DefaultClusterNode private[akka] ( override def isConnected = self.isConnected.isOn - override def getRemoteServerHostname = self.nodeAddress.hostname + override def getRemoteServerHostname = self.hostname - override def getRemoteServerPort = self.nodeAddress.port + override def getRemoteServerPort = self.port override def getNodeName = self.nodeAddress.nodeName @@ -1563,7 +1577,7 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E "MembershipChildListener at [%s] has children [%s]" .format(self.nodeAddress.nodeName, childList.mkString(" "))) self.findNewlyConnectedMembershipNodes(childList) foreach { name ⇒ - self.addressForNode(name) foreach (address ⇒ self.nodeNameToAddress += (name -> address)) // update 'nodename-address' map + self.remoteSocketAddressForNode(name) foreach (address ⇒ self.nodeNameToAddress += (name -> address)) // update 'nodename-address' map self.publish(NodeConnected(name)) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 1af789d666..220f136072 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -305,129 +305,7 @@ class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with node2.stop } - "be able to replicate an actor" in { - // create actor - val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start - val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "replicate-actor-1", port = 9001)).start - val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "replicate-actor-2", port = 9002)).start - val node3 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "replicate-actor-3", port = 9003)).start - - Thread.sleep(500) - - // register actor - import BinaryFormatMyJavaSerializableActor._ - val replicationFactor = 3 - node1.store(actorRef, replicationFactor) - - Thread.sleep(500) // since deployment is async (daemon ! command), we have to wait some before checking - - node1.isInUseOnNode(actorRef.address, node = NodeAddress("test-cluster", "replicate-actor-1", port = 9001)) must be(true) - node2.isInUseOnNode(actorRef.address, node = NodeAddress("test-cluster", "replicate-actor-2", port = 9002)) must be(true) - node3.isInUseOnNode(actorRef.address, node = NodeAddress("test-cluster", "replicate-actor-3", port = 9003)) must be(true) - - node1.stop - node2.stop - node3.stop - } - - "be able to create a reference to a replicated actor by address using Router.Direct routing" in { - Deployer.deploy(Deploy( - "actor-address", Direct, - Clustered(Home("localhost", 2552), NoReplicas, Stateless))) - val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start - - val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "router-direct-actor-by-id-1", port = 9001)).start - val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "router-direct-actor-by-id-2", port = 9002)).start - - Thread.sleep(500) - - import BinaryFormatMyJavaSerializableActor._ - val replicationFactor = 1 - node1.store(actorRef, replicationFactor) - - Thread.sleep(500) // since deployment is async (daemon ! command), we have to wait some before checking - - val ref = node1.ref(actorRef.address, router = Router.Direct) - - (ref !! "hello").getOrElse("_") must equal("world 1") - (ref !! "hello").getOrElse("_") must equal("world 2") - - node1.stop - node2.stop - } - - "be able to create a reference to a replicated actor by address using Router.Random routing" in { - // create actor - Deployer.deploy(Deploy( - "actor-address", Direct, - Clustered(Home("localhost", 2552), NoReplicas, Stateless))) - val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start - - val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "router-random-actor-by-id-1", port = 9001)).start - val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "router-random-actor-by-id-2", port = 9002)).start - val node3 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "router-random-actor-by-id-3", port = 9003)).start - - Thread.sleep(500) - - // register actor - import BinaryFormatMyJavaSerializableActor._ - val replicationFactor = 2 - node1.store(actorRef, replicationFactor) - - Thread.sleep(500) // since deployment is async (daemon ! command), we have to wait some before checking - - val ref = node1.ref(actorRef.address, router = Router.Random) - - (ref !! "hello").getOrElse("_") must equal("world 1") - - node1.stop - node2.stop - node3.stop - } - - "be able to create a reference to a replicated actor by address using Router.RoundRobin routing" in { - // create actor - val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start - - val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "router-round-robin-actor-by-id-1", port = 9001)).start - val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "router-round-robin-actor-by-id-2", port = 9002)).start - val node3 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "router-round-robin-actor-by-id-3", port = 9003)).start - - Thread.sleep(500) - - // register actor - import BinaryFormatMyJavaSerializableActor._ - val replicationFactor = 3 - node1.store(actorRef, replicationFactor) - - Thread.sleep(500) // since deployment is async (daemon ! command), we have to wait some before checking - - val ref = node1.ref(actorRef.address, router = Router.RoundRobin) - - node1.isInUseOnNode(actorRef.address, node = NodeAddress("test-cluster", "router-round-robin-actor-by-id-1", port = 9001)) must be(true) - node2.isInUseOnNode(actorRef.address, node = NodeAddress("test-cluster", "router-round-robin-actor-by-id-2", port = 9002)) must be(true) - node3.isInUseOnNode(actorRef.address, node = NodeAddress("test-cluster", "router-round-robin-actor-by-id-3", port = 9003)) must be(true) - - val addresses = node1.addressesForActor(actorRef.address) - addresses.length must equal(3) - - (ref !! "hello").getOrElse("_") must equal("world 1") - (ref !! "hello").getOrElse("_") must equal("world 1") - (ref !! "hello").getOrElse("_") must equal("world 1") - - (ref !! "hello").getOrElse("_") must equal("world 2") - (ref !! "hello").getOrElse("_") must equal("world 2") - (ref !! "hello").getOrElse("_") must equal("world 2") - - (ref !! "hello").getOrElse("_") must equal("world 3") - (ref !! "hello").getOrElse("_") must equal("world 3") - (ref !! "hello").getOrElse("_") must equal("world 3") - - node1.stop - node2.stop - node3.stop - } } override def beforeAll() = { @@ -440,53 +318,3 @@ class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with } } */ -/* - "be able to subscribe to actor location change events" in { - val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "test-node1", port = 9991) - val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "test-node2", port = 9992) - - val barrier = new CyclicBarrier(2) - - node2.register(ActorLocationsChildChange, new ChangeListener() { - def notify(node: ClusterNode) = barrier.await - }) - - try { - node1.start - node2.start - - // create actors - val actorRef1 = actorOf[MyJavaSerializableActor]("actor-address").start - val actorRef2 = actorOf[MyJavaSerializableActor]("actor-address").start - - // register actors - var serializeMailbox = true - import BinaryFormatMyJavaSerializableActor._ - node1.store(actorRef1, serializeMailbox) - node1.store(actorRef2, serializeMailbox) - - node1.isClustered(ActorAddress(actorRef1.uuid)) must be (true) - node1.uuidsForClusteredActors.exists(_ == actorRef1.uuid) must be (true) - - // check out actor - val actorRef1_2 = node1.use(actorRef1.uuid) - val actorRef2_2 = node1.use(actorRef2.uuid) - - // should migrate to node2 - node1.stop - node1.isRunning must be (false) - - barrier.await(20, TimeUnit.SECONDS) - - actorRef1.stop - actorRef2.stop - actorRef1_2.stop - actorRef2_2.stop - - } finally { - node2.stop - node2.isRunning must be (false) - } - } - -*/ diff --git a/akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmNode1.conf similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmNode1.conf rename to akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmNode1.conf diff --git a/akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmNode1.opts similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmNode1.opts rename to akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmNode1.opts diff --git a/akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmNode2.conf similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmNode2.conf rename to akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmNode2.conf diff --git a/akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmNode2.opts similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmNode2.opts rename to akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmNode2.opts diff --git a/akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmSpec.scala new file mode 100644 index 0000000000..e2a8a3e90d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/migration/api/MigrationApiMultiJvmSpec.scala @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.migration.api + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterAll + +import akka.actor._ +import Actor._ +import akka.cluster._ +import ChangeListener._ +import Cluster._ +import akka.config.Config +import akka.serialization.Serialization + +import java.util.concurrent._ + +object MigrationApiMultiJvmSpec { + var NrOfNodes = 2 + + class HelloWorld extends Actor with Serializable { + def receive = { + case "Hello" ⇒ + self.reply("World from node [" + Config.nodename + "]") + } + } +} + +class MigrationApiMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { + import MigrationApiMultiJvmSpec._ + + "A cluster" must { + + "be able to migrate an actor from one node to another" in { + + barrier("start-node-1", NrOfNodes) { + node.start() + } + + barrier("start-node-2", NrOfNodes) { + } + + barrier("store-1-in-node-1", NrOfNodes) { + val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x ⇒ fail("No serializer found"), s ⇒ s) + node.store(actorOf[HelloWorld]("hello-world"), serializer) + } + + barrier("use-1-in-node-2", NrOfNodes) { + } + + barrier("migrate-from-node2-to-node1", NrOfNodes) { + } + + barrier("check-actor-is-moved-to-node1", NrOfNodes) { + node.isInUseOnNode("hello-world") must be(true) + + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? "Hello").as[String].get must be("World from node [node1]") + } + } + } + + override def beforeAll() = { + startLocalCluster() + } + + override def afterAll() = { + shutdownLocalCluster() + } +} + +class MigrationApiMultiJvmNode2 extends WordSpec with MustMatchers { + import MigrationApiMultiJvmSpec._ + + "A cluster" must { + + "be able to migrate an actor from one node to another" in { + + barrier("start-node-1", NrOfNodes) { + } + + barrier("start-node-2", NrOfNodes) { + node.start() + } + + barrier("store-1-in-node-1", NrOfNodes) { + } + + barrier("use-1-in-node-2", NrOfNodes) { + val actorOrOption = node.use("hello-world") + if (actorOrOption.isEmpty) fail("Actor could not be retrieved") + + val actorRef = actorOrOption.get + actorRef.address must be("hello-world") + + (actorRef ? "Hello").as[String].get must be("World from node [node2]") + } + + barrier("migrate-from-node2-to-node1", NrOfNodes) { + node.migrate(NodeAddress(node.nodeAddress.clusterName, "node1"), "hello-world") + Thread.sleep(2000) + } + + barrier("check-actor-is-moved-to-node1", NrOfNodes) { + } + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode1.conf new file mode 100644 index 0000000000..480c30c09d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode1.conf @@ -0,0 +1 @@ +akka.event-handler-level = "DEBUG" diff --git a/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode2.conf new file mode 100644 index 0000000000..480c30c09d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode2.conf @@ -0,0 +1 @@ +akka.event-handler-level = "DEBUG" diff --git a/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmSpec.scala similarity index 96% rename from akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmSpec.scala rename to akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmSpec.scala index c98c19562a..64d3cad406 100644 --- a/akka-cluster/src/test/scala/akka/cluster/registry/store/RegistryStoreMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/registry/RegistryStoreMultiJvmSpec.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka.cluster.registry.store +package akka.cluster.registry import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers @@ -147,7 +147,7 @@ class RegistryStoreMultiJvmNode2 extends WordSpec with MustMatchers { val actorRef = actorOrOption.get actorRef.address must be("hello-world-3") - (actorRef ? "Count").as[Int].get must be >= (2) // be conservative - can by 5 but also 2 if slow system + (actorRef ? ("Count", 30000)).as[Int].get must be >= (2) // be conservative - can by 5 but also 2 if slow system } node.shutdown() diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index c947eca63a..ceb24c849b 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -99,7 +99,6 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec object Dependencies { - // Compile lazy val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" //New BSD lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2