Clarify JoinSeedNodeProcess, see #2270
* Implemented without ScatterGatherFirstCompletedRouter, since that is more straightforward and might cause less confusion * Added more description of what it does
This commit is contained in:
parent
7b6ff9b019
commit
963c9a4e3e
1 changed files with 30 additions and 17 deletions
|
|
@ -6,11 +6,9 @@ package akka.cluster
|
|||
import scala.collection.immutable.SortedSet
|
||||
import scala.concurrent.util.{ Deadline, Duration }
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, RootActorPath, PoisonPill, Scheduler }
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, PoisonPill, Scheduler }
|
||||
import akka.actor.Status.Failure
|
||||
import akka.routing.ScatterGatherFirstCompletedRouter
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.{ AskTimeoutException, ask, pipe }
|
||||
import MemberStatus._
|
||||
|
||||
/**
|
||||
|
|
@ -222,7 +220,8 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
|
|||
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
|
||||
self ! JoinTo(selfAddress)
|
||||
else
|
||||
context.actorOf(Props(new JoinSeedNodeProcess(environment)), "joinSeedNodeProcess")
|
||||
context.actorOf(Props(new JoinSeedNodeProcess(environment)).
|
||||
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -829,38 +828,52 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
|
|||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Sends InitJoinAck to all seed nodes (except itself) and expect
|
||||
* InitJoinAck reply back. The seed node that replied first
|
||||
* will be used, joined to. InitJoinAck replies received after the
|
||||
* first one are ignored.
|
||||
*
|
||||
* Retries if no InitJoinAck replies are received within the
|
||||
* SeedNodeTimeout.
|
||||
* When at least one reply has been received it stops itself after
|
||||
* an idle SeedNodeTimeout.
|
||||
*
|
||||
*/
|
||||
private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging {
|
||||
import InternalClusterAction._
|
||||
import context.dispatcher
|
||||
|
||||
def selfAddress = environment.selfAddress
|
||||
|
||||
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
|
||||
throw new IllegalArgumentException("Join seed node should not be done")
|
||||
|
||||
context.setReceiveTimeout(environment.settings.SeedNodeTimeout)
|
||||
|
||||
override def preStart(): Unit = {
|
||||
self ! InternalClusterAction.JoinSeedNode
|
||||
self ! JoinSeedNode
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case JoinSeedNode ⇒
|
||||
val seedRoutees = environment.seedNodes.collect {
|
||||
case a if a != selfAddress ⇒ context.parent.path.toStringWithAddress(a)
|
||||
// send InitJoin to all seed nodes (except myself)
|
||||
val seedRefs = environment.seedNodes.collect {
|
||||
case a if a != selfAddress ⇒ context.system.actorFor(context.parent.path.toStringWithAddress(a))
|
||||
}
|
||||
implicit val within = Timeout(environment.settings.SeedNodeTimeout)
|
||||
val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
|
||||
routees = seedRoutees, within = within.duration)))
|
||||
seedRouter ! InitJoin
|
||||
seedRouter ! PoisonPill
|
||||
seedRefs foreach { _ ! InitJoin }
|
||||
case InitJoinAck(address) ⇒
|
||||
// first InitJoinAck reply
|
||||
context.parent ! JoinTo(address)
|
||||
context.stop(self)
|
||||
case Failure(e: AskTimeoutException) ⇒
|
||||
// try again later, first seed node must be started before other seed nodes can join
|
||||
environment.scheduler.scheduleOnce(environment.settings.SeedNodeTimeout, self, InternalClusterAction.JoinSeedNode)
|
||||
context.become(done)
|
||||
case ReceiveTimeout ⇒
|
||||
// no InitJoinAck received, try again
|
||||
self ! JoinSeedNode
|
||||
}
|
||||
|
||||
def done: Actor.Receive = {
|
||||
case InitJoinAck(_) ⇒ // already received one, skip rest
|
||||
case ReceiveTimeout ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue