Fixed bug in Cluster; registration of actor address per node mapping was done in wrong order and using ephemeral node.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-08-26 08:23:56 +02:00
parent 9ade2d7f57
commit 2646ecd14a

View file

@ -735,8 +735,6 @@ class DefaultClusterNode private[akka] (
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = {
val nodeName = nodeAddress.nodeName
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName)))
val actorFactoryPath = actorAddressRegistryPathFor(actorAddress)
zkClient.retryUntilConnected(new Callable[Either[Exception, () LocalActorRef]]() {
def call: Either[Exception, () LocalActorRef] = {
@ -805,6 +803,9 @@ class DefaultClusterNode private[akka] (
// create ADDRESS -> UUIDs mapping
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorAddress, uuid)))
// create ADDRESS -> NODE mapping
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToNodesPathFor(actorAddress, nodeName)))
actorRef.start()
actorRef
}
@ -1182,10 +1183,8 @@ class DefaultClusterNode private[akka] (
}
} catch {
case e: Exception
val error = new ClusterException(
"Remote command to [%s] timed out".format(connection.address))
EventHandler.error(error, this, error.toString)
throw error
EventHandler.error(e, this, "Could not send remote command to [%s] due to: %s".format(connection.address, e.toString))
throw e
}
}
}
@ -1477,8 +1476,6 @@ class DefaultClusterNode private[akka] (
val remoteAddress = remoteSocketAddressForNode(to.nodeName).getOrElse(throw new ClusterException("No remote address registered for [" + to.nodeName + "]"))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, to.nodeName)))
ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, from.nodeName)))
// FIXME who takes care of this line?
@ -1637,8 +1634,9 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E
val disconnectedConnections = self.connectToAllNewlyArrivedMembershipNodesInCluster(newlyConnectedMembershipNodes, newlyDisconnectedMembershipNodes)
// if node(s) left cluster then migrate actors residing on the failed node
if (!newlyDisconnectedMembershipNodes.isEmpty)
if (!newlyDisconnectedMembershipNodes.isEmpty) {
self.migrateActorsOnFailedNodes(newlyDisconnectedMembershipNodes, currentClusterNodes, oldClusterNodes.toList, disconnectedConnections)
}
// publish NodeConnected and NodeDisconnect events to the listeners
newlyConnectedMembershipNodes foreach (node self.publish(NodeConnected(node)))
@ -1854,6 +1852,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
} else {
EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message))
}
self.reply(Success(cluster.remoteServerAddress.toString))
} catch {
case error: Throwable