Merge pull request #1443 from akka/wip-3359-auto-join-patriknw

Remove auto-join config, derive from seed-nodes, see #3359
This commit is contained in:
Patrik Nordwall 2013-05-17 04:57:07 -07:00
commit 8f04b53ac7
25 changed files with 53 additions and 94 deletions

View file

@ -260,13 +260,16 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// start periodic publish of current stats
val publishStatsTask: Option[Cancellable] = PublishStatsInterval match {
case Duration.Zero | Duration.Undefined | Duration.Inf None
case Duration.Zero | _: Duration.Infinite None
case d: FiniteDuration
Some(scheduler.schedule(PeriodicTasksInitialDelay.max(d), d, self, PublishStatsTick))
}
override def preStart(): Unit = {
if (AutoJoin) self ! JoinSeedNodes(SeedNodes)
if (SeedNodes.isEmpty)
log.info("No seed-nodes configured, manual cluster join required")
else
self ! JoinSeedNodes(SeedNodes)
}
override def postStop(): Unit = {
@ -296,7 +299,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
case _: Tick
if (deadline.exists(_.isOverdue)) {
context.become(uninitialized)
if (AutoJoin) joinSeedNodes(SeedNodes)
if (SeedNodes.nonEmpty) joinSeedNodes(SeedNodes)
else join(joinWith)
}
}
@ -336,17 +339,19 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = {
require(seedNodeProcess.isEmpty, "Join seed nodes is already in progress")
seedNodeProcess =
if (seedNodes.isEmpty || seedNodes == immutable.IndexedSeq(selfAddress)) {
self ! ClusterUserAction.JoinTo(selfAddress)
None
} else if (seedNodes.head == selfAddress) {
Some(context.actorOf(Props(classOf[FirstSeedNodeProcess], seedNodes).
withDispatcher(UseDispatcher), name = "firstSeedNodeProcess"))
} else {
Some(context.actorOf(Props(classOf[JoinSeedNodeProcess], seedNodes).
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess"))
}
if (seedNodes.nonEmpty) {
seedNodeProcess =
if (seedNodes == immutable.IndexedSeq(selfAddress)) {
self ! ClusterUserAction.JoinTo(selfAddress)
None
} else if (seedNodes.head == selfAddress) {
Some(context.actorOf(Props(classOf[FirstSeedNodeProcess], seedNodes).
withDispatcher(UseDispatcher), name = "firstSeedNodeProcess"))
} else {
Some(context.actorOf(Props(classOf[JoinSeedNodeProcess], seedNodes).
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess"))
}
}
}
/**
@ -383,8 +388,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
joining(selfUniqueAddress, cluster.selfRoles)
} else {
val joinDeadline = RetryUnsuccessfulJoinAfter match {
case Duration.Undefined | Duration.Inf None
case d: FiniteDuration Some(Deadline.now + d)
case d: FiniteDuration Some(Deadline.now + d)
case _ None
}
context.become(tryingToJoin(address, joinDeadline))
clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles)