diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index ff954a83cf..70a6477087 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -6,11 +6,9 @@ package akka.cluster import scala.collection.immutable.SortedSet import scala.concurrent.util.{ Deadline, Duration } import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, RootActorPath, PoisonPill, Scheduler } +import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, PoisonPill, Scheduler } import akka.actor.Status.Failure -import akka.routing.ScatterGatherFirstCompletedRouter import akka.util.Timeout -import akka.pattern.{ AskTimeoutException, ask, pipe } import MemberStatus._ /** @@ -222,7 +220,8 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) self ! JoinTo(selfAddress) else - context.actorOf(Props(new JoinSeedNodeProcess(environment)), "joinSeedNodeProcess") + context.actorOf(Props(new JoinSeedNodeProcess(environment)). + withDispatcher(UseDispatcher), name = "joinSeedNodeProcess") } } @@ -829,38 +828,52 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) /** * INTERNAL API. + * + * Sends InitJoinAck to all seed nodes (except itself) and expect + * InitJoinAck reply back. The seed node that replied first + * will be used, joined to. InitJoinAck replies received after the + * first one are ignored. + * + * Retries if no InitJoinAck replies are received within the + * SeedNodeTimeout. + * When at least one reply has been received it stops itself after + * an idle SeedNodeTimeout. + * */ private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging { import InternalClusterAction._ - import context.dispatcher def selfAddress = environment.selfAddress if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) throw new IllegalArgumentException("Join seed node should not be done") + context.setReceiveTimeout(environment.settings.SeedNodeTimeout) + override def preStart(): Unit = { - self ! InternalClusterAction.JoinSeedNode + self ! JoinSeedNode } def receive = { case JoinSeedNode ⇒ - val seedRoutees = environment.seedNodes.collect { - case a if a != selfAddress ⇒ context.parent.path.toStringWithAddress(a) + // send InitJoin to all seed nodes (except myself) + val seedRefs = environment.seedNodes.collect { + case a if a != selfAddress ⇒ context.system.actorFor(context.parent.path.toStringWithAddress(a)) } - implicit val within = Timeout(environment.settings.SeedNodeTimeout) - val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter( - routees = seedRoutees, within = within.duration))) - seedRouter ! InitJoin - seedRouter ! PoisonPill + seedRefs foreach { _ ! InitJoin } case InitJoinAck(address) ⇒ + // first InitJoinAck reply context.parent ! JoinTo(address) - context.stop(self) - case Failure(e: AskTimeoutException) ⇒ - // try again later, first seed node must be started before other seed nodes can join - environment.scheduler.scheduleOnce(environment.settings.SeedNodeTimeout, self, InternalClusterAction.JoinSeedNode) + context.become(done) + case ReceiveTimeout ⇒ + // no InitJoinAck received, try again + self ! JoinSeedNode } + def done: Actor.Receive = { + case InitJoinAck(_) ⇒ // already received one, skip rest + case ReceiveTimeout ⇒ context.stop(self) + } } /**