From 549f33a3ffd75aa2f94e665d9bc874ed78ab0234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 17 Jun 2011 10:25:02 +0200 Subject: [PATCH] Improved error handling in Cluster.scala --- LICENSE | 4 +- .../scala/akka/cluster/ClusterInterface.scala | 6 +- .../src/main/scala/akka/cluster/Cluster.scala | 214 ++++++++++-------- .../src/main/scala/akka/cluster/Storage.scala | 2 +- 4 files changed, 127 insertions(+), 99 deletions(-) diff --git a/LICENSE b/LICENSE index c14a2bed2f..2a9d5f00c4 100755 --- a/LICENSE +++ b/LICENSE @@ -16,5 +16,5 @@ the License. --------------- -Licenses for dependency projects can be found here: -[http://doc.akka.io/licenses] +Licenses for dependency projects can be found here: +[http://akka.io/docs/akka/snapshot/project/licenses.html] diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index 82155ebc90..68f8a2cc00 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -390,7 +390,7 @@ trait ClusterNode { /** * Returns the actor id for the actor with a specific UUID. */ - def actorAddressForUuid(uuid: UUID): String + def actorAddressForUuid(uuid: UUID): Option[String] /** * Returns the actor ids for all the actors with a specific UUID. @@ -456,7 +456,7 @@ trait ClusterNode { /** * Returns the config element for the key or NULL if no element exists under the key. */ - def getConfigElement(key: String): Array[Byte] + def getConfigElement(key: String): Option[Array[Byte]] def removeConfigElement(key: String) @@ -464,7 +464,7 @@ trait ClusterNode { private[cluster] def initializeNode() - private[cluster] def addressForNode(node: String): InetSocketAddress + private[cluster] def addressForNode(node: String): Option[InetSocketAddress] private[cluster] def publish(change: ChangeNotification) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c8fd8698f3..0fec08d25a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1,3 +1,4 @@ + /** * Copyright (C) 2009-2011 Scalable Solutions AB */ @@ -368,6 +369,7 @@ class DefaultClusterNode private[akka] ( lazy private[cluster] val leaderLock = new WriteLock( zkClient.connection.getZookeeper, LEADER_ELECTION_NODE, null, leaderElectionCallback) { + // ugly hack, but what do you do? <--- haha epic private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId") ownerIdField.setAccessible(true) @@ -622,44 +624,46 @@ class DefaultClusterNode private[akka] ( val actorRegistryPath = actorRegistryPathFor(uuid) // create UUID -> Array[Byte] for actor registry - if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store actor bytes in Data Grid not ZooKeeper - else { - zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() { - def call: Either[String, Exception] = { - try { - Left(zkClient.connection.create(actorRegistryPath, actorBytes, CreateMode.PERSISTENT)) - } catch { - case e: KeeperException.NodeExistsException ⇒ Right(e) + try { + zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store actor bytes in Data Grid not ZooKeeper + } catch { + case e: ZkNoNodeException ⇒ // if not stored yet, store the actor + zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() { + def call: Either[String, Exception] = { + try { + Left(zkClient.connection.create(actorRegistryPath, actorBytes, CreateMode.PERSISTENT)) + } catch { + case e: KeeperException.NodeExistsException ⇒ Right(e) + } } + }) match { + case Left(path) ⇒ path + case Right(exception) ⇒ actorRegistryPath } - }) match { - case Left(path) ⇒ path - case Right(exception) ⇒ actorRegistryPath - } - // create UUID -> Format registry - try { - zkClient.createPersistent(actorRegistryFormatPathFor(uuid), format) - } catch { - case e: ZkNodeExistsException ⇒ zkClient.writeData(actorRegistryFormatPathFor(uuid), format) - } + // create UUID -> Format registry + try { + zkClient.createPersistent(actorRegistryFormatPathFor(uuid), format) + } catch { + case e: ZkNodeExistsException ⇒ zkClient.writeData(actorRegistryFormatPathFor(uuid), format) + } - // create UUID -> ADDRESS registry - try { - zkClient.createPersistent(actorRegistryActorAddressPathFor(uuid), actorRef.address) - } catch { - case e: ZkNodeExistsException ⇒ zkClient.writeData(actorRegistryActorAddressPathFor(uuid), actorRef.address) - } + // create UUID -> ADDRESS registry + try { + zkClient.createPersistent(actorRegistryActorAddressPathFor(uuid), actorRef.address) + } catch { + case e: ZkNodeExistsException ⇒ zkClient.writeData(actorRegistryActorAddressPathFor(uuid), actorRef.address) + } - // create UUID -> Address registry - ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid))) + // create UUID -> Address registry + ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid))) - // create UUID -> Node registry - ignore[ZkNodeExistsException](zkClient.createPersistent(actorLocationsPathFor(uuid))) + // create UUID -> Node registry + ignore[ZkNodeExistsException](zkClient.createPersistent(actorLocationsPathFor(uuid))) - // create ADDRESS -> UUIDs registry - ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorRef.address))) - ignore[ZkNodeExistsException](zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid))) + // create ADDRESS -> UUIDs registry + ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorRef.address))) + ignore[ZkNodeExistsException](zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid))) } import RemoteClusterDaemon._ @@ -698,7 +702,8 @@ class DefaultClusterNode private[akka] ( locallyCheckedOutActors.remove(uuid) // warning: ordering matters here - ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(actorAddressForUuid(uuid)))) // FIXME remove ADDRESS to UUID mapping? + // FIXME remove ADDRESS to UUID mapping? + actorAddressForUuid(uuid) foreach (address ⇒ ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(address)))) ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid))) ignore[ZkNoNodeException](zkClient.deleteRecursive(actorRegistryPathFor(uuid))) ignore[ZkNoNodeException](zkClient.deleteRecursive(actorLocationsPathFor(uuid))) @@ -769,7 +774,7 @@ class DefaultClusterNode private[akka] ( Left(if (shouldCompressData) LZF.uncompress(zkClient.connection.readData(actorPath, new Stat, false)) else zkClient.connection.readData(actorPath, new Stat, false)) } catch { - case e: KeeperException.NodeExistsException ⇒ Right(e) + case e: KeeperException.NoNodeException ⇒ Right(e) } } }) match { @@ -922,18 +927,19 @@ class DefaultClusterNode private[akka] ( /** * Returns the actor id for the actor with a specific UUID. */ - def actorAddressForUuid(uuid: UUID): String = if (isConnected.isOn) { + def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) { try { - zkClient.readData(actorRegistryActorAddressPathFor(uuid)).asInstanceOf[String] + Some(zkClient.readData(actorRegistryActorAddressPathFor(uuid)).asInstanceOf[String]) } catch { - case e: ZkNoNodeException ⇒ "" + case e: ZkNoNodeException ⇒ None } - } else "" + } else None /** * Returns the actor ids for all the actors with a specific UUID. */ - def actorAddressForUuids(uuids: Array[UUID]): Array[String] = uuids map (actorAddressForUuid(_)) filter (_ != "") + def actorAddressForUuids(uuids: Array[UUID]): Array[String] = + uuids map (actorAddressForUuid(_)) filter (_.isDefined) map (_.get) /** * Returns the actor UUIDs for actor ID. @@ -1008,8 +1014,12 @@ class DefaultClusterNode private[akka] ( def formatForActor(actorAddress: String): Serializer = { val formats = actorUuidsForActorAddress(actorAddress) map { uuid ⇒ - zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Serializer] - } + try { + Some(zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Serializer]) + } catch { + case e: ZkNoNodeException ⇒ None + } + } filter (_.isDefined) map (_.get) if (formats.isEmpty) throw new IllegalStateException("No Serializer found for [%s]".format(actorAddress)) if (formats.forall(_ == formats.head) == false) throw new IllegalStateException("Multiple Serializer classes found for [%s]".format(actorAddress)) @@ -1136,10 +1146,10 @@ class DefaultClusterNode private[akka] ( /** * Returns the config element for the key or NULL if no element exists under the key. */ - def getConfigElement(key: String): Array[Byte] = try { - zkClient.connection.readData(configurationPathFor(key), new Stat, true) + def getConfigElement(key: String): Option[Array[Byte]] = try { + Some(zkClient.connection.readData(configurationPathFor(key), new Stat, true)) } catch { - case e: KeeperException.NoNodeException ⇒ null + case e: KeeperException.NoNodeException ⇒ None } def removeConfigElement(key: String) { @@ -1202,14 +1212,18 @@ class DefaultClusterNode private[akka] ( EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress)) } - private[cluster] def addressForNode(node: String): InetSocketAddress = { - val address = zkClient.readData(membershipPathFor(node)).asInstanceOf[String] - val tokenizer = new java.util.StringTokenizer(address, ":") - tokenizer.nextToken // cluster name - tokenizer.nextToken // node name - val hostname = tokenizer.nextToken // hostname - val port = tokenizer.nextToken.toInt // port - new InetSocketAddress(hostname, port) + private[cluster] def addressForNode(node: String): Option[InetSocketAddress] = { + try { + val address = zkClient.readData(membershipPathFor(node)).asInstanceOf[String] + val tokenizer = new java.util.StringTokenizer(address, ":") + tokenizer.nextToken // cluster name + tokenizer.nextToken // node name + val hostname = tokenizer.nextToken // hostname + val port = tokenizer.nextToken.toInt // port + Some(new InetSocketAddress(hostname, port)) + } catch { + case e: ZkNoNodeException ⇒ None + } } private def actorUuidsForActorAddress(actorAddress: String): Array[UUID] = @@ -1253,11 +1267,14 @@ class DefaultClusterNode private[akka] ( membershipNodes foreach { node ⇒ if ((node != Config.nodename)) { // no replica on the "home" node of the ref if (!replicaConnections.contains(node)) { // only connect to each replica once - val address = addressForNode(node) - EventHandler.debug(this, - "Connecting to replica with nodename [%s] and address [%s]".format(node, address)) - val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort) - replicaConnections.put(node, (address, clusterDaemon)) + val addressOption = addressForNode(node) + if (addressOption.isDefined) { + val address = addressOption.get + EventHandler.debug(this, + "Connecting to replica with nodename [%s] and address [%s]".format(node, address)) + val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort) + replicaConnections.put(node, (address, clusterDaemon)) + } } } } @@ -1313,19 +1330,22 @@ class DefaultClusterNode private[akka] ( "Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]" .format(failedNodeName, uuid, nodeAddress.nodeName)) - val actorAddress = actorAddressForUuid(uuidFrom(uuid)) - migrateWithoutCheckingThatActorResidesOnItsHomeNode( // since the ephemeral node is already gone, so can't check - NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress) + val actorAddressOption = actorAddressForUuid(uuidFrom(uuid)) + if (actorAddressOption.isDefined) { + val actorAddress = actorAddressOption.get + migrateWithoutCheckingThatActorResidesOnItsHomeNode( // since the ephemeral node is already gone, so can't check + NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress) - implicit val format: Serializer = formatForActor(actorAddress) - use(actorAddress) foreach { actor ⇒ - // FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)' - //actor.homeAddress = remoteServerAddress - val homeAddress = classOf[LocalActorRef].getDeclaredField("homeAddress") - homeAddress.setAccessible(true) - homeAddress.set(actor, Some(remoteServerAddress)) + implicit val format: Serializer = formatForActor(actorAddress) + use(actorAddress) foreach { actor ⇒ + // FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)' + //actor.homeAddress = remoteServerAddress + val homeAddress = classOf[LocalActorRef].getDeclaredField("homeAddress") + homeAddress.setAccessible(true) + homeAddress.set(actor, Some(remoteServerAddress)) - remoteService.register(actorAddress, actor) + remoteService.register(actorAddress, actor) + } } } @@ -1357,22 +1377,25 @@ class DefaultClusterNode private[akka] ( from: NodeAddress, to: NodeAddress, actorAddress: String) { actorUuidsForActorAddress(actorAddress) map { uuid ⇒ - val actorAddress = actorAddressForUuid(uuid) + val actorAddressOption = actorAddressForUuid(uuid) + if (actorAddressOption.isDefined) { + val actorAddress = actorAddressOption.get - if (!isInUseOnNode(actorAddress, to)) { - release(actorAddress) + if (!isInUseOnNode(actorAddress, to)) { + release(actorAddress) - val newAddress = new InetSocketAddress(to.hostname, to.port) - ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid))) - ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, newAddress))) - ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to))) - ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid))) + val newAddress = new InetSocketAddress(to.hostname, to.port) + ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid))) + ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, newAddress))) + ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to))) + ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid))) - ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, from))) - ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(from.nodeName, uuid))) + ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, from))) + ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(from.nodeName, uuid))) - // 'use' (check out) actor on the remote 'to' node - useActorOnNode(to.nodeName, uuid) + // 'use' (check out) actor on the remote 'to' node + useActorOnNode(to.nodeName, uuid) + } } } } @@ -1402,10 +1425,9 @@ class DefaultClusterNode private[akka] ( private def createNodeStructureIfNeeded() { baseNodes.foreach { path ⇒ try { - zkClient.create(path, null, CreateMode.PERSISTENT) + ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) EventHandler.debug(this, "Created node [%s]".format(path)) } catch { - case e: ZkNodeExistsException ⇒ {} // do nothing case e ⇒ val error = new ClusterException(e.toString) EventHandler.error(error, this) @@ -1419,6 +1441,11 @@ class DefaultClusterNode private[akka] ( zkClient.subscribeChildChanges(MEMBERSHIP_NODE, membershipListener) } + private def unregisterListeners() = { + zkClient.unsubscribeStateChanges(stateListener) + zkClient.unsubscribeChildChanges(MEMBERSHIP_NODE, membershipListener) + } + private def fetchMembershipChildrenNodes() { val membershipChildren = zkClient.getChildren(MEMBERSHIP_NODE) locallyCachedMembershipNodes.clear() @@ -1474,7 +1501,7 @@ class DefaultClusterNode private[akka] ( override def setConfigElement(key: String, value: String): Unit = self.setConfigElement(key, value.getBytes("UTF-8")) - override def getConfigElement(key: String) = new String(self.getConfigElement(key), "UTF-8") + override def getConfigElement(key: String) = new String(self.getConfigElement(key).getOrElse(Array[Byte]()), "UTF-8") override def removeConfigElement(key: String): Unit = self.removeConfigElement(key) @@ -1550,11 +1577,8 @@ class StateListener(self: ClusterNode) extends IZkStateListener { trait ErrorHandler { def withErrorHandler[T](body: ⇒ T) = { try { - body + ignore[ZkInterruptedException](body) } catch { - case e: org.I0Itec.zkclient.exception.ZkInterruptedException ⇒ { - /* ignore */ - } case e: Throwable ⇒ EventHandler.error(e, this, e.toString) throw e @@ -1595,20 +1619,22 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { case USE ⇒ try { if (message.hasActorUuid) { - val uuid = uuidProtocolToUuid(message.getActorUuid) - val address = cluster.actorAddressForUuid(uuid) - implicit val format: Serializer = cluster formatForActor address - val actors = cluster use address + for { + address ← cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) + format ← cluster.formatForActor(address) + } cluster.use(address, format) + } else if (message.hasActorAddress) { val address = message.getActorAddress - implicit val format: Serializer = cluster formatForActor address - val actors = cluster use address + cluster.formatForActor(address) foreach (format ⇒ cluster.use(address, format)) + } else { EventHandler.warning(this, "None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]" .format(message)) } self.reply(Success) + } catch { case error ⇒ self.reply(Failure(error)) @@ -1617,7 +1643,9 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { case RELEASE ⇒ if (message.hasActorUuid) { - cluster release cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) + cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address ⇒ + cluster.release(address) + } } else if (message.hasActorAddress) { cluster release message.getActorAddress } else { diff --git a/akka-cluster/src/main/scala/akka/cluster/Storage.scala b/akka-cluster/src/main/scala/akka/cluster/Storage.scala index b232ba9946..3d1533c490 100755 --- a/akka-cluster/src/main/scala/akka/cluster/Storage.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Storage.scala @@ -1,7 +1,7 @@ /** * Copyright (C) 2009-2011 Scalable Solutions AB */ - package akka.cluster +package akka.cluster import zookeeper.AkkaZkClient import akka.AkkaException