diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 2cd6dca1c5..3adbbae324 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -217,7 +217,13 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) }) override def preStart(): Unit = { - if (AutoJoin) self ! InternalClusterAction.JoinSeedNode + if (AutoJoin) { + // only the node which is named first in the list of seed nodes will join itself + if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) + self ! JoinTo(selfAddress) + else + context.actorOf(Props(new JoinSeedNodeProcess(environment)), "joinSeedNodeProcess") + } } override def postStop(): Unit = { @@ -229,12 +235,9 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } def uninitialized: Actor.Receive = { - case JoinSeedNode ⇒ joinSeedNode() - case InitJoin ⇒ // skip, not ready yet - case InitJoinAck(address) ⇒ join(address) - case JoinTo(address) ⇒ join(address) - case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() - case _: Tick ⇒ // ignore periodic tasks until initialized + case InitJoin ⇒ // skip, not ready yet + case JoinTo(address) ⇒ join(address) + case _: Tick ⇒ // ignore periodic tasks until initialized } def initialized: Actor.Receive = { @@ -259,26 +262,8 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def receive = uninitialized - def joinSeedNode(): Unit = { - // only the node which is named first in the list of seed nodes will join itself - if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) - join(selfAddress) - else { - val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress ⇒ self.path.toStringWithAddress(a) } - implicit val within = Timeout(SeedNodeTimeout) - val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration))) - seedRouter ! InitJoin - seedRouter ! PoisonPill - } - } - def initJoin(): Unit = sender ! InitJoinAck(selfAddress) - def joinSeedNodeTimeout(): Unit = { - // try again later, first seed node must be started before other seed nodes can join - clusterScheduler.scheduleOnce(SeedNodeTimeout, self, JoinSeedNode) - } - /** * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. @@ -842,6 +827,42 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def ping(p: Ping): Unit = sender ! Pong(p) } +/** + * INTERNAL API. + */ +private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging { + import InternalClusterAction._ + import context.dispatcher + + def selfAddress = environment.selfAddress + + if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) + throw new IllegalArgumentException("Join seed node should not be done") + + override def preStart(): Unit = { + self ! InternalClusterAction.JoinSeedNode + } + + def receive = { + case JoinSeedNode ⇒ + val seedRoutees = environment.seedNodes.collect { + case a if a != selfAddress ⇒ context.parent.path.toStringWithAddress(a) + } + implicit val within = Timeout(environment.settings.SeedNodeTimeout) + val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter( + routees = seedRoutees, within = within.duration))) + seedRouter ! InitJoin + seedRouter ! PoisonPill + case InitJoinAck(address) ⇒ + context.parent ! JoinTo(address) + context.stop(self) + case Failure(e: AskTimeoutException) ⇒ + // try again later, first seed node must be started before other seed nodes can join + environment.scheduler.scheduleOnce(environment.settings.SeedNodeTimeout, self, InternalClusterAction.JoinSeedNode) + } + +} + /** * INTERNAL API. */