Join seed nodes before becoming singleton cluster, see #2267
* self is initially not member (in gossip state) * if the join to seed nodes timeout it joins itself, and becomes singleton cluster * remove the special case handling of singelton cluster in gossip merge, since singleton cluster is not the normal state when joining any more
This commit is contained in:
parent
df7f558c2a
commit
25996bf284
7 changed files with 53 additions and 30 deletions
|
|
@ -364,20 +364,23 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto
|
|||
val log = Logging(context.system, this)
|
||||
|
||||
def receive = {
|
||||
case JoinSeedNode ⇒ joinSeedNode()
|
||||
case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress)
|
||||
case InitJoinAck(address) ⇒ cluster.join(address)
|
||||
case Join(address) ⇒ cluster.joining(address)
|
||||
case Down(address) ⇒ cluster.downing(address)
|
||||
case Leave(address) ⇒ cluster.leaving(address)
|
||||
case Exit(address) ⇒ cluster.exiting(address)
|
||||
case Remove(address) ⇒ cluster.removing(address)
|
||||
case JoinSeedNode ⇒ joinSeedNode()
|
||||
case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress)
|
||||
case InitJoinAck(address) ⇒ cluster.join(address)
|
||||
case Join(address) ⇒ cluster.joining(address)
|
||||
case Down(address) ⇒ cluster.downing(address)
|
||||
case Leave(address) ⇒ cluster.leaving(address)
|
||||
case Exit(address) ⇒ cluster.exiting(address)
|
||||
case Remove(address) ⇒ cluster.removing(address)
|
||||
case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout()
|
||||
}
|
||||
|
||||
def joinSeedNode(): Unit = {
|
||||
val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress)
|
||||
yield self.path.toStringWithAddress(address)
|
||||
if (seedRoutees.nonEmpty) {
|
||||
if (seedRoutees.isEmpty) {
|
||||
cluster join cluster.selfAddress
|
||||
} else {
|
||||
implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout)
|
||||
val seedRouter = context.actorOf(
|
||||
Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
|
||||
|
|
@ -387,6 +390,8 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto
|
|||
}
|
||||
}
|
||||
|
||||
def joinSeedNodeTimeout(): Unit = cluster join cluster.selfAddress
|
||||
|
||||
override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
|
||||
}
|
||||
|
||||
|
|
@ -534,10 +539,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
|
||||
private val state = {
|
||||
val member = Member(selfAddress, Joining)
|
||||
val versionedGossip = Gossip(members = Gossip.emptyMembers + member) :+ vclockNode // add me as member and update my vector clock
|
||||
val seenVersionedGossip = versionedGossip seen selfAddress
|
||||
new AtomicReference[State](State(seenVersionedGossip))
|
||||
// note that self is not initially member,
|
||||
// and the Gossip is not versioned for this 'Node' yet
|
||||
new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers)))
|
||||
}
|
||||
|
||||
// try to join one of the nodes defined in the 'akka.cluster.seed-nodes'
|
||||
|
|
@ -797,7 +801,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
val newUnreachableMembers = localUnreachable filterNot { _.address == node }
|
||||
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
|
||||
|
||||
val newMembers = localMembers + Member(node, Joining) // add joining node as Joining
|
||||
// add joining node as Joining
|
||||
// add self in case someone else joins before self has joined (Set discards duplicates)
|
||||
val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining)
|
||||
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
||||
|
||||
val versionedGossip = newGossip :+ vclockNode
|
||||
|
|
@ -939,11 +945,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
if (!localGossip.overview.isNonDownUnreachable(from)) {
|
||||
|
||||
val winningGossip =
|
||||
if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) {
|
||||
// a fresh singleton cluster that is joining, no need to merge, use received gossip
|
||||
remoteGossip
|
||||
|
||||
} else if (remoteGossip.version <> localGossip.version) {
|
||||
if (remoteGossip.version <> localGossip.version) {
|
||||
// concurrent
|
||||
val mergedGossip = remoteGossip merge localGossip
|
||||
val versionedMergedGossip = mergedGossip :+ vclockNode
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue