Improved error handling in Cluster.scala

This commit is contained in:
Jonas Bonér 2011-06-17 10:25:02 +02:00
parent 5bcbdb22eb
commit 549f33a3ff
4 changed files with 127 additions and 99 deletions

View file

@ -16,5 +16,5 @@ the License.
--------------- ---------------
Licenses for dependency projects can be found here: Licenses for dependency projects can be found here:
[http://doc.akka.io/licenses] [http://akka.io/docs/akka/snapshot/project/licenses.html]

View file

@ -390,7 +390,7 @@ trait ClusterNode {
/** /**
* Returns the actor id for the actor with a specific UUID. * Returns the actor id for the actor with a specific UUID.
*/ */
def actorAddressForUuid(uuid: UUID): String def actorAddressForUuid(uuid: UUID): Option[String]
/** /**
* Returns the actor ids for all the actors with a specific UUID. * Returns the actor ids for all the actors with a specific UUID.
@ -456,7 +456,7 @@ trait ClusterNode {
/** /**
* Returns the config element for the key or NULL if no element exists under the key. * Returns the config element for the key or NULL if no element exists under the key.
*/ */
def getConfigElement(key: String): Array[Byte] def getConfigElement(key: String): Option[Array[Byte]]
def removeConfigElement(key: String) def removeConfigElement(key: String)
@ -464,7 +464,7 @@ trait ClusterNode {
private[cluster] def initializeNode() private[cluster] def initializeNode()
private[cluster] def addressForNode(node: String): InetSocketAddress private[cluster] def addressForNode(node: String): Option[InetSocketAddress]
private[cluster] def publish(change: ChangeNotification) private[cluster] def publish(change: ChangeNotification)

View file

@ -1,3 +1,4 @@
/** /**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/ */
@ -368,6 +369,7 @@ class DefaultClusterNode private[akka] (
lazy private[cluster] val leaderLock = new WriteLock( lazy private[cluster] val leaderLock = new WriteLock(
zkClient.connection.getZookeeper, LEADER_ELECTION_NODE, null, leaderElectionCallback) { zkClient.connection.getZookeeper, LEADER_ELECTION_NODE, null, leaderElectionCallback) {
// ugly hack, but what do you do? <--- haha epic // ugly hack, but what do you do? <--- haha epic
private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId") private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId")
ownerIdField.setAccessible(true) ownerIdField.setAccessible(true)
@ -622,44 +624,46 @@ class DefaultClusterNode private[akka] (
val actorRegistryPath = actorRegistryPathFor(uuid) val actorRegistryPath = actorRegistryPathFor(uuid)
// create UUID -> Array[Byte] for actor registry // create UUID -> Array[Byte] for actor registry
if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store actor bytes in Data Grid not ZooKeeper try {
else { zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store actor bytes in Data Grid not ZooKeeper
zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() { } catch {
def call: Either[String, Exception] = { case e: ZkNoNodeException // if not stored yet, store the actor
try { zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() {
Left(zkClient.connection.create(actorRegistryPath, actorBytes, CreateMode.PERSISTENT)) def call: Either[String, Exception] = {
} catch { try {
case e: KeeperException.NodeExistsException Right(e) Left(zkClient.connection.create(actorRegistryPath, actorBytes, CreateMode.PERSISTENT))
} catch {
case e: KeeperException.NodeExistsException Right(e)
}
} }
}) match {
case Left(path) path
case Right(exception) actorRegistryPath
} }
}) match {
case Left(path) path
case Right(exception) actorRegistryPath
}
// create UUID -> Format registry // create UUID -> Format registry
try { try {
zkClient.createPersistent(actorRegistryFormatPathFor(uuid), format) zkClient.createPersistent(actorRegistryFormatPathFor(uuid), format)
} catch { } catch {
case e: ZkNodeExistsException zkClient.writeData(actorRegistryFormatPathFor(uuid), format) case e: ZkNodeExistsException zkClient.writeData(actorRegistryFormatPathFor(uuid), format)
} }
// create UUID -> ADDRESS registry // create UUID -> ADDRESS registry
try { try {
zkClient.createPersistent(actorRegistryActorAddressPathFor(uuid), actorRef.address) zkClient.createPersistent(actorRegistryActorAddressPathFor(uuid), actorRef.address)
} catch { } catch {
case e: ZkNodeExistsException zkClient.writeData(actorRegistryActorAddressPathFor(uuid), actorRef.address) case e: ZkNodeExistsException zkClient.writeData(actorRegistryActorAddressPathFor(uuid), actorRef.address)
} }
// create UUID -> Address registry // create UUID -> Address registry
ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid))) ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
// create UUID -> Node registry // create UUID -> Node registry
ignore[ZkNodeExistsException](zkClient.createPersistent(actorLocationsPathFor(uuid))) ignore[ZkNodeExistsException](zkClient.createPersistent(actorLocationsPathFor(uuid)))
// create ADDRESS -> UUIDs registry // create ADDRESS -> UUIDs registry
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorRef.address))) ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorRef.address)))
ignore[ZkNodeExistsException](zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid))) ignore[ZkNodeExistsException](zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid)))
} }
import RemoteClusterDaemon._ import RemoteClusterDaemon._
@ -698,7 +702,8 @@ class DefaultClusterNode private[akka] (
locallyCheckedOutActors.remove(uuid) locallyCheckedOutActors.remove(uuid)
// warning: ordering matters here // warning: ordering matters here
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(actorAddressForUuid(uuid)))) // FIXME remove ADDRESS to UUID mapping? // FIXME remove ADDRESS to UUID mapping?
actorAddressForUuid(uuid) foreach (address ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(address))))
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid))) ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorRegistryPathFor(uuid))) ignore[ZkNoNodeException](zkClient.deleteRecursive(actorRegistryPathFor(uuid)))
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorLocationsPathFor(uuid))) ignore[ZkNoNodeException](zkClient.deleteRecursive(actorLocationsPathFor(uuid)))
@ -769,7 +774,7 @@ class DefaultClusterNode private[akka] (
Left(if (shouldCompressData) LZF.uncompress(zkClient.connection.readData(actorPath, new Stat, false)) Left(if (shouldCompressData) LZF.uncompress(zkClient.connection.readData(actorPath, new Stat, false))
else zkClient.connection.readData(actorPath, new Stat, false)) else zkClient.connection.readData(actorPath, new Stat, false))
} catch { } catch {
case e: KeeperException.NodeExistsException Right(e) case e: KeeperException.NoNodeException Right(e)
} }
} }
}) match { }) match {
@ -922,18 +927,19 @@ class DefaultClusterNode private[akka] (
/** /**
* Returns the actor id for the actor with a specific UUID. * Returns the actor id for the actor with a specific UUID.
*/ */
def actorAddressForUuid(uuid: UUID): String = if (isConnected.isOn) { def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) {
try { try {
zkClient.readData(actorRegistryActorAddressPathFor(uuid)).asInstanceOf[String] Some(zkClient.readData(actorRegistryActorAddressPathFor(uuid)).asInstanceOf[String])
} catch { } catch {
case e: ZkNoNodeException "" case e: ZkNoNodeException None
} }
} else "" } else None
/** /**
* Returns the actor ids for all the actors with a specific UUID. * Returns the actor ids for all the actors with a specific UUID.
*/ */
def actorAddressForUuids(uuids: Array[UUID]): Array[String] = uuids map (actorAddressForUuid(_)) filter (_ != "") def actorAddressForUuids(uuids: Array[UUID]): Array[String] =
uuids map (actorAddressForUuid(_)) filter (_.isDefined) map (_.get)
/** /**
* Returns the actor UUIDs for actor ID. * Returns the actor UUIDs for actor ID.
@ -1008,8 +1014,12 @@ class DefaultClusterNode private[akka] (
def formatForActor(actorAddress: String): Serializer = { def formatForActor(actorAddress: String): Serializer = {
val formats = actorUuidsForActorAddress(actorAddress) map { uuid val formats = actorUuidsForActorAddress(actorAddress) map { uuid
zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Serializer] try {
} Some(zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Serializer])
} catch {
case e: ZkNoNodeException None
}
} filter (_.isDefined) map (_.get)
if (formats.isEmpty) throw new IllegalStateException("No Serializer found for [%s]".format(actorAddress)) if (formats.isEmpty) throw new IllegalStateException("No Serializer found for [%s]".format(actorAddress))
if (formats.forall(_ == formats.head) == false) throw new IllegalStateException("Multiple Serializer classes found for [%s]".format(actorAddress)) if (formats.forall(_ == formats.head) == false) throw new IllegalStateException("Multiple Serializer classes found for [%s]".format(actorAddress))
@ -1136,10 +1146,10 @@ class DefaultClusterNode private[akka] (
/** /**
* Returns the config element for the key or NULL if no element exists under the key. * Returns the config element for the key or NULL if no element exists under the key.
*/ */
def getConfigElement(key: String): Array[Byte] = try { def getConfigElement(key: String): Option[Array[Byte]] = try {
zkClient.connection.readData(configurationPathFor(key), new Stat, true) Some(zkClient.connection.readData(configurationPathFor(key), new Stat, true))
} catch { } catch {
case e: KeeperException.NoNodeException null case e: KeeperException.NoNodeException None
} }
def removeConfigElement(key: String) { def removeConfigElement(key: String) {
@ -1202,14 +1212,18 @@ class DefaultClusterNode private[akka] (
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress)) EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
} }
private[cluster] def addressForNode(node: String): InetSocketAddress = { private[cluster] def addressForNode(node: String): Option[InetSocketAddress] = {
val address = zkClient.readData(membershipPathFor(node)).asInstanceOf[String] try {
val tokenizer = new java.util.StringTokenizer(address, ":") val address = zkClient.readData(membershipPathFor(node)).asInstanceOf[String]
tokenizer.nextToken // cluster name val tokenizer = new java.util.StringTokenizer(address, ":")
tokenizer.nextToken // node name tokenizer.nextToken // cluster name
val hostname = tokenizer.nextToken // hostname tokenizer.nextToken // node name
val port = tokenizer.nextToken.toInt // port val hostname = tokenizer.nextToken // hostname
new InetSocketAddress(hostname, port) val port = tokenizer.nextToken.toInt // port
Some(new InetSocketAddress(hostname, port))
} catch {
case e: ZkNoNodeException None
}
} }
private def actorUuidsForActorAddress(actorAddress: String): Array[UUID] = private def actorUuidsForActorAddress(actorAddress: String): Array[UUID] =
@ -1253,11 +1267,14 @@ class DefaultClusterNode private[akka] (
membershipNodes foreach { node membershipNodes foreach { node
if ((node != Config.nodename)) { // no replica on the "home" node of the ref if ((node != Config.nodename)) { // no replica on the "home" node of the ref
if (!replicaConnections.contains(node)) { // only connect to each replica once if (!replicaConnections.contains(node)) { // only connect to each replica once
val address = addressForNode(node) val addressOption = addressForNode(node)
EventHandler.debug(this, if (addressOption.isDefined) {
"Connecting to replica with nodename [%s] and address [%s]".format(node, address)) val address = addressOption.get
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort) EventHandler.debug(this,
replicaConnections.put(node, (address, clusterDaemon)) "Connecting to replica with nodename [%s] and address [%s]".format(node, address))
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort)
replicaConnections.put(node, (address, clusterDaemon))
}
} }
} }
} }
@ -1313,19 +1330,22 @@ class DefaultClusterNode private[akka] (
"Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]" "Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]"
.format(failedNodeName, uuid, nodeAddress.nodeName)) .format(failedNodeName, uuid, nodeAddress.nodeName))
val actorAddress = actorAddressForUuid(uuidFrom(uuid)) val actorAddressOption = actorAddressForUuid(uuidFrom(uuid))
migrateWithoutCheckingThatActorResidesOnItsHomeNode( // since the ephemeral node is already gone, so can't check if (actorAddressOption.isDefined) {
NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress) val actorAddress = actorAddressOption.get
migrateWithoutCheckingThatActorResidesOnItsHomeNode( // since the ephemeral node is already gone, so can't check
NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress)
implicit val format: Serializer = formatForActor(actorAddress) implicit val format: Serializer = formatForActor(actorAddress)
use(actorAddress) foreach { actor use(actorAddress) foreach { actor
// FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)' // FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)'
//actor.homeAddress = remoteServerAddress //actor.homeAddress = remoteServerAddress
val homeAddress = classOf[LocalActorRef].getDeclaredField("homeAddress") val homeAddress = classOf[LocalActorRef].getDeclaredField("homeAddress")
homeAddress.setAccessible(true) homeAddress.setAccessible(true)
homeAddress.set(actor, Some(remoteServerAddress)) homeAddress.set(actor, Some(remoteServerAddress))
remoteService.register(actorAddress, actor) remoteService.register(actorAddress, actor)
}
} }
} }
@ -1357,22 +1377,25 @@ class DefaultClusterNode private[akka] (
from: NodeAddress, to: NodeAddress, actorAddress: String) { from: NodeAddress, to: NodeAddress, actorAddress: String) {
actorUuidsForActorAddress(actorAddress) map { uuid actorUuidsForActorAddress(actorAddress) map { uuid
val actorAddress = actorAddressForUuid(uuid) val actorAddressOption = actorAddressForUuid(uuid)
if (actorAddressOption.isDefined) {
val actorAddress = actorAddressOption.get
if (!isInUseOnNode(actorAddress, to)) { if (!isInUseOnNode(actorAddress, to)) {
release(actorAddress) release(actorAddress)
val newAddress = new InetSocketAddress(to.hostname, to.port) val newAddress = new InetSocketAddress(to.hostname, to.port)
ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid))) ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, newAddress))) ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, newAddress)))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to))) ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to)))
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid))) ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, from))) ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, from)))
ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(from.nodeName, uuid))) ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(from.nodeName, uuid)))
// 'use' (check out) actor on the remote 'to' node // 'use' (check out) actor on the remote 'to' node
useActorOnNode(to.nodeName, uuid) useActorOnNode(to.nodeName, uuid)
}
} }
} }
} }
@ -1402,10 +1425,9 @@ class DefaultClusterNode private[akka] (
private def createNodeStructureIfNeeded() { private def createNodeStructureIfNeeded() {
baseNodes.foreach { path baseNodes.foreach { path
try { try {
zkClient.create(path, null, CreateMode.PERSISTENT) ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path)) EventHandler.debug(this, "Created node [%s]".format(path))
} catch { } catch {
case e: ZkNodeExistsException {} // do nothing
case e case e
val error = new ClusterException(e.toString) val error = new ClusterException(e.toString)
EventHandler.error(error, this) EventHandler.error(error, this)
@ -1419,6 +1441,11 @@ class DefaultClusterNode private[akka] (
zkClient.subscribeChildChanges(MEMBERSHIP_NODE, membershipListener) zkClient.subscribeChildChanges(MEMBERSHIP_NODE, membershipListener)
} }
private def unregisterListeners() = {
zkClient.unsubscribeStateChanges(stateListener)
zkClient.unsubscribeChildChanges(MEMBERSHIP_NODE, membershipListener)
}
private def fetchMembershipChildrenNodes() { private def fetchMembershipChildrenNodes() {
val membershipChildren = zkClient.getChildren(MEMBERSHIP_NODE) val membershipChildren = zkClient.getChildren(MEMBERSHIP_NODE)
locallyCachedMembershipNodes.clear() locallyCachedMembershipNodes.clear()
@ -1474,7 +1501,7 @@ class DefaultClusterNode private[akka] (
override def setConfigElement(key: String, value: String): Unit = self.setConfigElement(key, value.getBytes("UTF-8")) override def setConfigElement(key: String, value: String): Unit = self.setConfigElement(key, value.getBytes("UTF-8"))
override def getConfigElement(key: String) = new String(self.getConfigElement(key), "UTF-8") override def getConfigElement(key: String) = new String(self.getConfigElement(key).getOrElse(Array[Byte]()), "UTF-8")
override def removeConfigElement(key: String): Unit = self.removeConfigElement(key) override def removeConfigElement(key: String): Unit = self.removeConfigElement(key)
@ -1550,11 +1577,8 @@ class StateListener(self: ClusterNode) extends IZkStateListener {
trait ErrorHandler { trait ErrorHandler {
def withErrorHandler[T](body: T) = { def withErrorHandler[T](body: T) = {
try { try {
body ignore[ZkInterruptedException](body)
} catch { } catch {
case e: org.I0Itec.zkclient.exception.ZkInterruptedException {
/* ignore */
}
case e: Throwable case e: Throwable
EventHandler.error(e, this, e.toString) EventHandler.error(e, this, e.toString)
throw e throw e
@ -1595,20 +1619,22 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case USE case USE
try { try {
if (message.hasActorUuid) { if (message.hasActorUuid) {
val uuid = uuidProtocolToUuid(message.getActorUuid) for {
val address = cluster.actorAddressForUuid(uuid) address cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid))
implicit val format: Serializer = cluster formatForActor address format cluster.formatForActor(address)
val actors = cluster use address } cluster.use(address, format)
} else if (message.hasActorAddress) { } else if (message.hasActorAddress) {
val address = message.getActorAddress val address = message.getActorAddress
implicit val format: Serializer = cluster formatForActor address cluster.formatForActor(address) foreach (format cluster.use(address, format))
val actors = cluster use address
} else { } else {
EventHandler.warning(this, EventHandler.warning(this,
"None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]" "None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]"
.format(message)) .format(message))
} }
self.reply(Success) self.reply(Success)
} catch { } catch {
case error case error
self.reply(Failure(error)) self.reply(Failure(error))
@ -1617,7 +1643,9 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case RELEASE case RELEASE
if (message.hasActorUuid) { if (message.hasActorUuid) {
cluster release cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address
cluster.release(address)
}
} else if (message.hasActorAddress) { } else if (message.hasActorAddress) {
cluster release message.getActorAddress cluster release message.getActorAddress
} else { } else {

View file

@ -1,7 +1,7 @@
/** /**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/ */
package akka.cluster package akka.cluster
import zookeeper.AkkaZkClient import zookeeper.AkkaZkClient
import akka.AkkaException import akka.AkkaException