Make Cluster ready for use before constructor returns, #2311

This commit is contained in:
Patrik Nordwall 2012-07-05 07:56:40 +02:00
parent 05015536b3
commit 6aa5f93f6e

View file

@ -57,12 +57,6 @@ sealed trait ClusterMessage extends Serializable
*/ */
object ClusterUserAction { object ClusterUserAction {
/**
* Command to initiate join another node (represented by 'address').
* Join will be sent to the other node.
*/
case class JoinTo(address: Address) extends ClusterMessage
/** /**
* Command to join the cluster. Sent when a node (represented by 'address') * Command to join the cluster. Sent when a node (represented by 'address')
* wants to join another node (the receiver). * wants to join another node (the receiver).
@ -84,7 +78,13 @@ object ClusterUserAction {
/** /**
* INTERNAL API * INTERNAL API
*/ */
object InternalClusterAction { private[cluster] object InternalClusterAction {
/**
* Command to initiate join another node (represented by 'address').
* Join will be sent to the other node.
*/
case class JoinTo(address: Address) extends ClusterMessage
/** /**
* Start message of the process to join one of the seed nodes. * Start message of the process to join one of the seed nodes.
@ -134,24 +134,22 @@ object InternalClusterAction {
} }
/** /**
* INTERNAL API.
*
* Cluster commands sent by the LEADER. * Cluster commands sent by the LEADER.
*/ */
object ClusterLeaderAction { private[cluster] object ClusterLeaderAction {
/** /**
* INTERNAL API.
*
* Command to mark a node to be removed from the cluster immediately. * Command to mark a node to be removed from the cluster immediately.
* Can only be sent by the leader. * Can only be sent by the leader.
*/ */
private[cluster] case class Exit(address: Address) extends ClusterMessage case class Exit(address: Address) extends ClusterMessage
/** /**
* INTERNAL API.
*
* Command to remove a node from the cluster immediately. * Command to remove a node from the cluster immediately.
*/ */
private[cluster] case class Remove(address: Address) extends ClusterMessage case class Remove(address: Address) extends ClusterMessage
} }
/** /**
@ -679,25 +677,25 @@ private[cluster] final class ClusterCore(cluster: Cluster) extends Actor with Ac
} }
def receive = { def receive = {
case JoinSeedNode joinSeedNode() case JoinSeedNode joinSeedNode()
case InitJoin initJoin() case InitJoin initJoin()
case InitJoinAck(address) join(address) case InitJoinAck(address) join(address)
case Failure(e: AskTimeoutException) joinSeedNodeTimeout() case Failure(e: AskTimeoutException) joinSeedNodeTimeout()
case ClusterUserAction.JoinTo(address) join(address) case JoinTo(address) join(address)
case ClusterUserAction.Join(address) joining(address) case ClusterUserAction.Join(address) joining(address)
case ClusterUserAction.Down(address) downing(address) case ClusterUserAction.Down(address) downing(address)
case ClusterUserAction.Leave(address) leaving(address) case ClusterUserAction.Leave(address) leaving(address)
case Exit(address) exiting(address) case Exit(address) exiting(address)
case Remove(address) removing(address) case Remove(address) removing(address)
case msg: GossipEnvelope receiveGossip(msg) case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg) case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip() case GossipTick gossip()
case HeartbeatTick heartbeat() case HeartbeatTick heartbeat()
case ReapUnreachableTick reapUnreachableMembers() case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions() case LeaderActionsTick leaderActions()
case SendGossipTo(address) gossipTo(address) case SendGossipTo(address) gossipTo(address)
case PublishStateTick publishState() case PublishStateTick publishState()
case p: Ping ping(p) case p: Ping ping(p)
} }
@ -1341,10 +1339,10 @@ private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends A
val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(cluster)). val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(cluster)).
withDispatcher(configuredDispatcher), name = "heartbeat") withDispatcher(configuredDispatcher), name = "heartbeat")
def receive = Actor.emptyBehavior def receive = {
case InternalClusterAction.GetClusterCoreRef sender ! core
}
override def unhandled(unknown: Any): Unit = log.error("[{}] can not respond to messages - received [{}]",
self.path, unknown)
} }
/** /**
@ -1474,20 +1472,17 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// create supervisor for daemons under path "/system/cluster" // create supervisor for daemons under path "/system/cluster"
private val clusterDaemons: ActorRef = { private val clusterDaemons: ActorRef = {
implicit val timeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemonSupervisor(this)).
val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)).
withDispatcher(UseDispatcher), name = "cluster") withDispatcher(UseDispatcher), name = "cluster")
Await.result(system.systemGuardian ? createChild, timeout.duration) match {
case a: ActorRef a
case e: Exception throw e
}
} }
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[cluster] def clusterCore: ActorRef = private[cluster] val clusterCore: ActorRef = {
system.actorFor(clusterDaemons.path / "core") implicit val timeout = system.settings.CreationTimeout
Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration)
}
system.registerOnTermination(shutdown()) system.registerOnTermination(shutdown())
@ -1584,7 +1579,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* A 'Join(thisNodeAddress)' command is sent to the node to join. * A 'Join(thisNodeAddress)' command is sent to the node to join.
*/ */
def join(address: Address): Unit = def join(address: Address): Unit =
clusterCore ! ClusterUserAction.JoinTo(address) clusterCore ! InternalClusterAction.JoinTo(address)
/** /**
* Send command to issue state transition to LEAVING for the node specified by 'address'. * Send command to issue state transition to LEAVING for the node specified by 'address'.