Join seed node in separate actor, see #2270

This commit is contained in:
Patrik Nordwall 2012-08-14 17:26:33 +02:00
parent d7b0089d7e
commit 4f1f900e40

View file

@ -217,7 +217,13 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
})
override def preStart(): Unit = {
if (AutoJoin) self ! InternalClusterAction.JoinSeedNode
if (AutoJoin) {
// only the node which is named first in the list of seed nodes will join itself
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
self ! JoinTo(selfAddress)
else
context.actorOf(Props(new JoinSeedNodeProcess(environment)), "joinSeedNodeProcess")
}
}
override def postStop(): Unit = {
@ -229,12 +235,9 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
}
def uninitialized: Actor.Receive = {
case JoinSeedNode joinSeedNode()
case InitJoin // skip, not ready yet
case InitJoinAck(address) join(address)
case JoinTo(address) join(address)
case Failure(e: AskTimeoutException) joinSeedNodeTimeout()
case _: Tick // ignore periodic tasks until initialized
case InitJoin // skip, not ready yet
case JoinTo(address) join(address)
case _: Tick // ignore periodic tasks until initialized
}
def initialized: Actor.Receive = {
@ -259,26 +262,8 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
def receive = uninitialized
def joinSeedNode(): Unit = {
// only the node which is named first in the list of seed nodes will join itself
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
join(selfAddress)
else {
val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress self.path.toStringWithAddress(a) }
implicit val within = Timeout(SeedNodeTimeout)
val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration)))
seedRouter ! InitJoin
seedRouter ! PoisonPill
}
}
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
def joinSeedNodeTimeout(): Unit = {
// try again later, first seed node must be started before other seed nodes can join
clusterScheduler.scheduleOnce(SeedNodeTimeout, self, JoinSeedNode)
}
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
@ -842,6 +827,42 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
def ping(p: Ping): Unit = sender ! Pong(p)
}
/**
* INTERNAL API.
*/
private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging {
import InternalClusterAction._
import context.dispatcher
def selfAddress = environment.selfAddress
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
throw new IllegalArgumentException("Join seed node should not be done")
override def preStart(): Unit = {
self ! InternalClusterAction.JoinSeedNode
}
def receive = {
case JoinSeedNode
val seedRoutees = environment.seedNodes.collect {
case a if a != selfAddress context.parent.path.toStringWithAddress(a)
}
implicit val within = Timeout(environment.settings.SeedNodeTimeout)
val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
routees = seedRoutees, within = within.duration)))
seedRouter ! InitJoin
seedRouter ! PoisonPill
case InitJoinAck(address)
context.parent ! JoinTo(address)
context.stop(self)
case Failure(e: AskTimeoutException)
// try again later, first seed node must be started before other seed nodes can join
environment.scheduler.scheduleOnce(environment.settings.SeedNodeTimeout, self, InternalClusterAction.JoinSeedNode)
}
}
/**
* INTERNAL API.
*/