From 2646ecd14a716e7e7b78092d15bbf8243b98dc67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 26 Aug 2011 08:23:56 +0200 Subject: [PATCH] Fixed bug in Cluster; registration of actor address per node mapping was done in wrong order and using ephemeral node. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Cluster.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 45323b880e..19b0326cc7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -735,8 +735,6 @@ class DefaultClusterNode private[akka] ( def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = { val nodeName = nodeAddress.nodeName - ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName))) - val actorFactoryPath = actorAddressRegistryPathFor(actorAddress) zkClient.retryUntilConnected(new Callable[Either[Exception, () ⇒ LocalActorRef]]() { def call: Either[Exception, () ⇒ LocalActorRef] = { @@ -805,6 +803,9 @@ class DefaultClusterNode private[akka] ( // create ADDRESS -> UUIDs mapping ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorAddress, uuid))) + // create ADDRESS -> NODE mapping + ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToNodesPathFor(actorAddress, nodeName))) + actorRef.start() actorRef } @@ -1182,10 +1183,8 @@ class DefaultClusterNode private[akka] ( } } catch { case e: Exception ⇒ - val error = new ClusterException( - "Remote command to [%s] timed out".format(connection.address)) - EventHandler.error(error, this, error.toString) - throw error + EventHandler.error(e, this, "Could not send remote command to [%s] due to: %s".format(connection.address, e.toString)) + throw e } } } @@ -1477,8 +1476,6 @@ class DefaultClusterNode private[akka] ( val remoteAddress = remoteSocketAddressForNode(to.nodeName).getOrElse(throw new ClusterException("No remote address registered for [" + to.nodeName + "]")) - ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, to.nodeName))) - ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, from.nodeName))) // FIXME who takes care of this line? @@ -1637,8 +1634,9 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E val disconnectedConnections = self.connectToAllNewlyArrivedMembershipNodesInCluster(newlyConnectedMembershipNodes, newlyDisconnectedMembershipNodes) // if node(s) left cluster then migrate actors residing on the failed node - if (!newlyDisconnectedMembershipNodes.isEmpty) + if (!newlyDisconnectedMembershipNodes.isEmpty) { self.migrateActorsOnFailedNodes(newlyDisconnectedMembershipNodes, currentClusterNodes, oldClusterNodes.toList, disconnectedConnections) + } // publish NodeConnected and NodeDisconnect events to the listeners newlyConnectedMembershipNodes foreach (node ⇒ self.publish(NodeConnected(node))) @@ -1854,6 +1852,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { } else { EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message)) } + self.reply(Success(cluster.remoteServerAddress.toString)) } catch { case error: Throwable ⇒