diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 0bf7ebeba1..43cb115b4d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -57,12 +57,6 @@ sealed trait ClusterMessage extends Serializable */ object ClusterUserAction { - /** - * Command to initiate join another node (represented by 'address'). - * Join will be sent to the other node. - */ - case class JoinTo(address: Address) extends ClusterMessage - /** * Command to join the cluster. Sent when a node (represented by 'address') * wants to join another node (the receiver). @@ -84,7 +78,13 @@ object ClusterUserAction { /** * INTERNAL API */ -object InternalClusterAction { +private[cluster] object InternalClusterAction { + + /** + * Command to initiate join another node (represented by 'address'). + * Join will be sent to the other node. + */ + case class JoinTo(address: Address) extends ClusterMessage /** * Start message of the process to join one of the seed nodes. @@ -134,24 +134,22 @@ object InternalClusterAction { } /** + * INTERNAL API. + * * Cluster commands sent by the LEADER. */ -object ClusterLeaderAction { +private[cluster] object ClusterLeaderAction { /** - * INTERNAL API. - * * Command to mark a node to be removed from the cluster immediately. * Can only be sent by the leader. */ - private[cluster] case class Exit(address: Address) extends ClusterMessage + case class Exit(address: Address) extends ClusterMessage /** - * INTERNAL API. - * * Command to remove a node from the cluster immediately. */ - private[cluster] case class Remove(address: Address) extends ClusterMessage + case class Remove(address: Address) extends ClusterMessage } /** @@ -679,25 +677,25 @@ private[cluster] final class ClusterCore(cluster: Cluster) extends Actor with Ac } def receive = { - case JoinSeedNode ⇒ joinSeedNode() - case InitJoin ⇒ initJoin() - case InitJoinAck(address) ⇒ join(address) - case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() - case ClusterUserAction.JoinTo(address) ⇒ join(address) - case ClusterUserAction.Join(address) ⇒ joining(address) - case ClusterUserAction.Down(address) ⇒ downing(address) - case ClusterUserAction.Leave(address) ⇒ leaving(address) - case Exit(address) ⇒ exiting(address) - case Remove(address) ⇒ removing(address) - case msg: GossipEnvelope ⇒ receiveGossip(msg) - case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) - case GossipTick ⇒ gossip() - case HeartbeatTick ⇒ heartbeat() - case ReapUnreachableTick ⇒ reapUnreachableMembers() - case LeaderActionsTick ⇒ leaderActions() - case SendGossipTo(address) ⇒ gossipTo(address) - case PublishStateTick ⇒ publishState() - case p: Ping ⇒ ping(p) + case JoinSeedNode ⇒ joinSeedNode() + case InitJoin ⇒ initJoin() + case InitJoinAck(address) ⇒ join(address) + case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() + case JoinTo(address) ⇒ join(address) + case ClusterUserAction.Join(address) ⇒ joining(address) + case ClusterUserAction.Down(address) ⇒ downing(address) + case ClusterUserAction.Leave(address) ⇒ leaving(address) + case Exit(address) ⇒ exiting(address) + case Remove(address) ⇒ removing(address) + case msg: GossipEnvelope ⇒ receiveGossip(msg) + case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) + case GossipTick ⇒ gossip() + case HeartbeatTick ⇒ heartbeat() + case ReapUnreachableTick ⇒ reapUnreachableMembers() + case LeaderActionsTick ⇒ leaderActions() + case SendGossipTo(address) ⇒ gossipTo(address) + case PublishStateTick ⇒ publishState() + case p: Ping ⇒ ping(p) } @@ -1341,10 +1339,10 @@ private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends A val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(cluster)). withDispatcher(configuredDispatcher), name = "heartbeat") - def receive = Actor.emptyBehavior + def receive = { + case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core + } - override def unhandled(unknown: Any): Unit = log.error("[{}] can not respond to messages - received [{}]", - self.path, unknown) } /** @@ -1474,20 +1472,17 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { - implicit val timeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) - val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)). + system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemonSupervisor(this)). withDispatcher(UseDispatcher), name = "cluster") - Await.result(system.systemGuardian ? createChild, timeout.duration) match { - case a: ActorRef ⇒ a - case e: Exception ⇒ throw e - } } /** * INTERNAL API */ - private[cluster] def clusterCore: ActorRef = - system.actorFor(clusterDaemons.path / "core") + private[cluster] val clusterCore: ActorRef = { + implicit val timeout = system.settings.CreationTimeout + Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration) + } system.registerOnTermination(shutdown()) @@ -1584,7 +1579,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) * A 'Join(thisNodeAddress)' command is sent to the node to join. */ def join(address: Address): Unit = - clusterCore ! ClusterUserAction.JoinTo(address) + clusterCore ! InternalClusterAction.JoinTo(address) /** * Send command to issue state transition to LEAVING for the node specified by 'address'.