diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 1f9e87d50b..3cfea4d22b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -707,6 +707,8 @@ class LocalActorRef private[akka] ( EventHandler.error(e, actor, messageHandle.message.toString) throw e } + } else { + // throwing away message if actor is shut down, no use throwing an exception in receiving actor's thread, isShutdown is enforced on caller side } } finally { guard.lock.unlock() diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index 2254fbd956..fa28f0a4f3 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -102,6 +102,9 @@ class NodeAddress(val clusterName: String, val nodeName: String) { override def equals(other: Any) = NodeAddress.unapply(this) == NodeAddress.unapply(other) } +/** + * NodeAddress companion object and factory. + */ object NodeAddress { def apply(clusterName: String = Config.clusterName, nodeName: String = Config.nodename): NodeAddress = new NodeAddress(clusterName, nodeName) @@ -119,16 +122,7 @@ object NodeAddress { trait ClusterNode { import ChangeListener._ - def nodeAddress: NodeAddress - def zkServerAddresses: String - - def remoteClientLifeCycleListener: ActorRef - def remoteDaemon: ActorRef - def remoteService: RemoteSupport - def remoteServerAddress: InetSocketAddress - val isConnected = new Switch(false) - val electionNumber = new AtomicInteger(Int.MaxValue) private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]() private[cluster] val nodeNameToAddress: ConcurrentMap[String, InetSocketAddress] = new ConcurrentHashMap[String, InetSocketAddress] @@ -136,6 +130,14 @@ trait ClusterNode { def membershipNodes: Array[String] + def nodeAddress: NodeAddress + + def zkServerAddresses: String + + def remoteService: RemoteSupport + + def remoteServerAddress: InetSocketAddress + def isRunning: Boolean = isConnected.isOn def start(): ClusterNode @@ -293,11 +295,6 @@ trait ClusterNode { */ def remove(actorRef: ActorRef) - /** - * Removes actor with uuid from the cluster. - */ - def remove(uuid: UUID) - /** * Removes actor with address from the cluster. */ @@ -350,11 +347,6 @@ trait ClusterNode { */ def release(actorAddress: String) - /** - * Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'. - */ - def releaseActorOnAllNodes(uuid: UUID) - /** * Creates an ActorRef with a Router to a set of clustered actors. */ @@ -370,51 +362,16 @@ trait ClusterNode { */ def migrate(from: NodeAddress, to: NodeAddress, actorAddress: String) - /** - * Returns the UUIDs of all actors checked out on this node. - */ - def uuidsForActorsInUse: Array[UUID] - /** * Returns the addresses of all actors checked out on this node. */ def addressesForActorsInUse: Array[String] - /** - * Returns the UUIDs of all actors registered in this cluster. - */ - def uuidsForClusteredActors: Array[UUID] - /** * Returns the addresses of all actors registered in this cluster. */ def addressesForClusteredActors: Array[String] - /** - * Returns the actor id for the actor with a specific UUID. - */ - def actorAddressForUuid(uuid: UUID): Option[String] - - /** - * Returns the actor ids for all the actors with a specific UUID. - */ - def actorAddressForUuids(uuids: Array[UUID]): Array[String] - - /** - * Returns the actor UUIDs for actor ID. - */ - def uuidsForActorAddress(actorAddress: String): Array[UUID] - - /** - * Returns the node names of all actors in use with UUID. - */ - def nodesForActorsInUseWithUuid(uuid: UUID): Array[String] - - /** - * Returns the UUIDs of all actors in use registered on a specific node. - */ - def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] - /** * Returns the addresses of all actors in use registered on a specific node. */ @@ -477,6 +434,56 @@ trait ClusterNode { */ def getConfigElementKeys: Array[String] + // =============== PRIVATE METHODS =============== + + private[cluster] def remoteClientLifeCycleListener: ActorRef + private[cluster] def remoteDaemon: ActorRef + + /** + * Removes actor with uuid from the cluster. + */ + private[cluster] def remove(uuid: UUID) + + /** + * Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'. + */ + private[cluster] def releaseActorOnAllNodes(uuid: UUID) + + /** + * Returns the UUIDs of all actors checked out on this node. + */ + private[cluster] def uuidsForActorsInUse: Array[UUID] + + /** + * Returns the UUIDs of all actors registered in this cluster. + */ + private[cluster] def uuidsForClusteredActors: Array[UUID] + + /** + * Returns the actor id for the actor with a specific UUID. + */ + private[cluster] def actorAddressForUuid(uuid: UUID): Option[String] + + /** + * Returns the actor ids for all the actors with a specific UUID. + */ + private[cluster] def actorAddressForUuids(uuids: Array[UUID]): Array[String] + + /** + * Returns the actor UUIDs for actor ID. + */ + private[cluster] def uuidsForActorAddress(actorAddress: String): Array[UUID] + + /** + * Returns the node names of all actors in use with UUID. + */ + private[cluster] def nodesForActorsInUseWithUuid(uuid: UUID): Array[String] + + /** + * Returns the UUIDs of all actors in use registered on a specific node. + */ + private[cluster] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] + private[cluster] def initializeNode() private[cluster] def publish(change: ChangeNotification) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index f85d3484ff..1ef271f52b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -286,7 +286,7 @@ class DefaultClusterNode private[akka] ( import Cluster._ - lazy val remoteClientLifeCycleListener = localActorOf(new Actor { + private[cluster] lazy val remoteClientLifeCycleListener = localActorOf(new Actor { def receive = { case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule() case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule() @@ -294,9 +294,9 @@ class DefaultClusterNode private[akka] ( } }, "akka.cluster.RemoteClientLifeCycleListener").start() - lazy val remoteDaemon = localActorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start() + private[cluster] lazy val remoteDaemon = localActorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start() - lazy val remoteDaemonSupervisor = Supervisor( + private[cluster] lazy val remoteDaemonSupervisor = Supervisor( SupervisorConfig( OneForOneStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want? Supervise( @@ -853,7 +853,7 @@ class DefaultClusterNode private[akka] ( /** * Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'. */ - def releaseActorOnAllNodes(uuid: UUID) { + private[akka] def releaseActorOnAllNodes(uuid: UUID) { isConnected ifOn { EventHandler.debug(this, "Releasing (checking in) all actors with UUID [%s] on all nodes in cluster".format(uuid)) @@ -914,7 +914,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the UUIDs of all actors checked out on this node. */ - def uuidsForActorsInUse: Array[UUID] = uuidsForActorsInUseOnNode(nodeAddress.nodeName) + private[akka] def uuidsForActorsInUse: Array[UUID] = uuidsForActorsInUseOnNode(nodeAddress.nodeName) /** * Returns the addresses of all actors checked out on this node. @@ -924,7 +924,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the UUIDs of all actors registered in this cluster. */ - def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) { + private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) { zkClient.getChildren(ACTOR_REGISTRY_PATH).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]] } else Array.empty[UUID] @@ -936,7 +936,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the actor id for the actor with a specific UUID. */ - def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) { + private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) { try { Some(zkClient.readData(actorRegistryActorAddressPathFor(uuid)).asInstanceOf[String]) } catch { @@ -947,13 +947,13 @@ class DefaultClusterNode private[akka] ( /** * Returns the actor ids for all the actors with a specific UUID. */ - def actorAddressForUuids(uuids: Array[UUID]): Array[String] = + private[akka] def actorAddressForUuids(uuids: Array[UUID]): Array[String] = uuids map (actorAddressForUuid(_)) filter (_.isDefined) map (_.get) /** * Returns the actor UUIDs for actor ID. */ - def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) { + private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) { try { zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toArray map { case c: CharSequence ⇒ new UUID(c) @@ -966,7 +966,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the node names of all actors in use with UUID. */ - def nodesForActorsInUseWithUuid(uuid: UUID): Array[String] = if (isConnected.isOn) { + private[akka] def nodesForActorsInUseWithUuid(uuid: UUID): Array[String] = if (isConnected.isOn) { try { zkClient.getChildren(actorLocationsPathFor(uuid)).toArray.asInstanceOf[Array[String]] } catch { @@ -992,7 +992,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the UUIDs of all actors in use registered on a specific node. */ - def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) { + private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) { try { zkClient.getChildren(actorsAtNodePathFor(nodeName)).toArray map { case c: CharSequence ⇒ new UUID(c)