Nodes not part of cluster have marked the Gossip as seen, see #3031
* Problem may occur when joining member with same hostname:port again, after downing. * Reproduced with StressSpec exerciseJoinRemove with fixed port that joins and shutdown several times. * Real solution for this will be covered by ticket #2788 by adding uid to member identifier, but as first step we need to support this scenario with current design. * Use unique node identifier for vector clock to avoid mixup of old and new member instance. * Support transition from Down to Joining in Gossip merge * Don't gossip to unknown or unreachable members.
This commit is contained in:
parent
cab78e5174
commit
b349ad8d87
4 changed files with 28 additions and 8 deletions
|
|
@ -9,6 +9,7 @@ import scala.collection.immutable
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
import java.util.UUID
|
||||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler }
|
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler }
|
||||||
import akka.actor.OneForOneStrategy
|
import akka.actor.OneForOneStrategy
|
||||||
import akka.actor.Status.Failure
|
import akka.actor.Status.Failure
|
||||||
|
|
@ -219,7 +220,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
import cluster.{ selfAddress, scheduler, failureDetector }
|
import cluster.{ selfAddress, scheduler, failureDetector }
|
||||||
import cluster.settings._
|
import cluster.settings._
|
||||||
|
|
||||||
val vclockNode = VectorClock.Node(selfAddress.toString)
|
// FIXME the UUID should not be needed when Address contains uid, ticket #2788
|
||||||
|
val vclockNode = VectorClock.Node(selfAddress.toString + "-" + UUID.randomUUID())
|
||||||
|
|
||||||
// note that self is not initially member,
|
// note that self is not initially member,
|
||||||
// and the Gossip is not versioned for this 'Node' yet
|
// and the Gossip is not versioned for this 'Node' yet
|
||||||
|
|
@ -507,10 +509,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
val localGossip = latestGossip
|
val localGossip = latestGossip
|
||||||
|
|
||||||
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) {
|
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) {
|
||||||
// FIXME how should we handle this situation?
|
log.debug("Ignoring received gossip with self [{}] as unreachable, from [{}]", selfAddress, from)
|
||||||
log.debug("Received gossip with self as unreachable, from [{}]", from)
|
} else if (localGossip.overview.isNonDownUnreachable(from)) {
|
||||||
|
log.debug("Ignoring received gossip from unreachable [{}] ", from)
|
||||||
} else if (!localGossip.overview.isNonDownUnreachable(from)) {
|
} else {
|
||||||
|
|
||||||
// leader handles merge conflicts, or when they have different views of how is leader
|
// leader handles merge conflicts, or when they have different views of how is leader
|
||||||
val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader
|
val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader
|
||||||
|
|
@ -830,7 +832,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false))
|
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false))
|
||||||
|
|
||||||
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit =
|
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit =
|
||||||
if (address != selfAddress) clusterCore(address) ! gossipMsg
|
if (address != selfAddress && gossipMsg.gossip.members.exists(_.address == address))
|
||||||
|
clusterCore(address) ! gossipMsg
|
||||||
|
|
||||||
def publish(newGossip: Gossip): Unit = {
|
def publish(newGossip: Gossip): Unit = {
|
||||||
publisher ! PublishChanges(newGossip)
|
publisher ! PublishChanges(newGossip)
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,8 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
||||||
state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member)
|
state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member)
|
||||||
case event: MemberEvent ⇒
|
case event: MemberEvent ⇒
|
||||||
// replace current member with new member (might have different status, only address is used in equals)
|
// replace current member with new member (might have different status, only address is used in equals)
|
||||||
state = state.copy(members = state.members - event.member + event.member)
|
state = state.copy(members = state.members - event.member + event.member,
|
||||||
|
unreachable = state.unreachable - event.member)
|
||||||
case LeaderChanged(leader) ⇒ state = state.copy(leader = leader)
|
case LeaderChanged(leader) ⇒ state = state.copy(leader = leader)
|
||||||
case s: CurrentClusterState ⇒ state = s
|
case s: CurrentClusterState ⇒ state = s
|
||||||
case CurrentInternalStats(stats) ⇒ _latestStats = stats
|
case CurrentInternalStats(stats) ⇒ _latestStats = stats
|
||||||
|
|
|
||||||
|
|
@ -123,7 +123,12 @@ private[cluster] case class Gossip(
|
||||||
val mergedVClock = this.version merge that.version
|
val mergedVClock = this.version merge that.version
|
||||||
|
|
||||||
// 2. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
|
// 2. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
|
||||||
val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable)
|
// FIXME allowing Down -> Joining should be adjusted as part of ticket #2788
|
||||||
|
val mergedUnreachable = Member.pickHighestPriority(
|
||||||
|
this.overview.unreachable.filterNot(m1 ⇒
|
||||||
|
m1.status == Down && that.members.exists(m2 ⇒ m2.status == Joining && m2.address == m1.address)),
|
||||||
|
that.overview.unreachable.filterNot(m1 ⇒
|
||||||
|
m1.status == Down && this.members.exists(m2 ⇒ m2.status == Joining && m2.address == m1.address)))
|
||||||
|
|
||||||
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
|
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
|
||||||
// and exclude unreachable
|
// and exclude unreachable
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ class GossipSpec extends WordSpec with MustMatchers {
|
||||||
val d2 = Member(Address("akka.tcp", "sys", "d", 2552), Removed)
|
val d2 = Member(Address("akka.tcp", "sys", "d", 2552), Removed)
|
||||||
val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining)
|
val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining)
|
||||||
val e2 = Member(Address("akka.tcp", "sys", "e", 2552), Up)
|
val e2 = Member(Address("akka.tcp", "sys", "e", 2552), Up)
|
||||||
|
val e3 = Member(Address("akka.tcp", "sys", "e", 2552), Down)
|
||||||
|
|
||||||
"A Gossip" must {
|
"A Gossip" must {
|
||||||
|
|
||||||
|
|
@ -78,6 +79,16 @@ class GossipSpec extends WordSpec with MustMatchers {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"merge by allowing Down -> Joining" in {
|
||||||
|
val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(e3)))
|
||||||
|
val g2 = Gossip(members = SortedSet(a1, b1, e1), overview = GossipOverview(unreachable = Set.empty))
|
||||||
|
|
||||||
|
val merged2 = g2 merge g1
|
||||||
|
merged2.members must be(SortedSet(a1, b1, e1))
|
||||||
|
merged2.members.toSeq.map(_.status) must be(Seq(Up, Up, Joining))
|
||||||
|
merged2.overview.unreachable must be(Set.empty)
|
||||||
|
}
|
||||||
|
|
||||||
"start with fresh seen table after merge" in {
|
"start with fresh seen table after merge" in {
|
||||||
val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(a1.address)
|
val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(a1.address)
|
||||||
val g2 = Gossip(members = SortedSet(a2, e2)).seen(e2.address).seen(e2.address)
|
val g2 = Gossip(members = SortedSet(a2, e2)).seen(e2.address).seen(e2.address)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue