=clu #13226 Prune vector clocks from removed member

This commit is contained in:
Patrik Nordwall 2015-02-11 22:16:27 +01:00
parent f4abf80f50
commit 5cf35938d0
6 changed files with 196 additions and 6 deletions

View file

@ -330,6 +330,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
*/
def registerOnMemberUp(callback: Runnable): Unit =
clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback)
/**
* The supplied thunk will be run, once, when current cluster member is `Removed`.
* and if the cluster have been shutdown,that thunk will run on the caller thread immediately.

View file

@ -510,6 +510,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
else {
logInfo("Welcome from [{}]", from.address)
latestGossip = gossip seen selfUniqueAddress
assertLatestGossip()
publish(latestGossip)
if (from != selfUniqueAddress)
gossipTo(from, sender())
@ -657,10 +658,30 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
(remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), Newer)
case _
// conflicting versions, merge
(remoteGossip merge localGossip, true, Merge)
// We can see that a removal was done when it is not in one of the gossips has status
// Down or Exiting in the other gossip.
// Perform the same pruning (clear of VectorClock) as the leader did when removing a member.
// Removal of member itself is handled in merge (pickHighestPriority)
val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m)
if (Gossip.removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) {
log.debug("Cluster Node [{}] - Pruned conflicting local gossip: {}", selfAddress, m)
g.prune(VectorClock.Node(vclockName(m.uniqueAddress)))
} else
g
}
val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m)
if (Gossip.removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) {
log.debug("Cluster Node [{}] - Pruned conflicting remote gossip: {}", selfAddress, m)
g.prune(VectorClock.Node(vclockName(m.uniqueAddress)))
} else
g
}
(prunedRemoteGossip merge prunedLocalGossip, true, Merge)
}
latestGossip = winningGossip seen selfUniqueAddress
assertLatestGossip()
// for all new joining nodes we remove them from the failure detector
latestGossip.members foreach {
@ -886,7 +907,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// removing REMOVED nodes from the `reachability` table
val newReachability = localOverview.reachability.remove(removed)
val newOverview = localOverview copy (seen = newSeen, reachability = newReachability)
val newGossip = localGossip copy (members = newMembers, overview = newOverview)
// Clear the VectorClock when member is removed. The change made by the leader is stamped
// and will propagate as is if there are no other changes on other nodes.
// If other concurrent changes on other nodes (e.g. join) the pruning is also
// taken care of when receiving gossips.
val newVersion = removed.foldLeft(localGossip.version) { (v, node)
v.prune(VectorClock.Node(vclockName(node)))
}
val newGossip = localGossip copy (members = newMembers, overview = newOverview, version = newVersion)
updateLatestGossip(newGossip)
@ -1016,8 +1044,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress)
// Update the state with the new gossip
latestGossip = seenVersionedGossip
assertLatestGossip()
}
def assertLatestGossip(): Unit =
if (Cluster.isAssertInvariantsEnabled && latestGossip.version.versions.size > latestGossip.members.size)
throw new IllegalStateException(s"Too many vector clock entries in gossip state ${latestGossip}")
def publish(newGossip: Gossip): Unit = {
publisher ! PublishChanges(newGossip)
if (PublishStatsInterval == Duration.Zero) publishInternalStats()

View file

@ -213,6 +213,12 @@ private[cluster] final case class Gossip(
members.maxBy(m if (m.upNumber == Int.MaxValue) 0 else m.upNumber)
}
def prune(removedNode: VectorClock.Node): Gossip = {
val newVersion = version.prune(removedNode)
if (newVersion eq version) this
else copy(version = newVersion)
}
override def toString =
s"Gossip(members = [${members.mkString(", ")}], overview = ${overview}, version = ${version})"
}

View file

@ -187,5 +187,11 @@ final case class VectorClock(
VectorClock(mergedVersions)
}
def prune(removedNode: Node): VectorClock =
if (versions.contains(removedNode))
copy(versions = versions - removedNode)
else
this
override def toString = versions.map { case ((n, t)) n + " -> " + t }.mkString("VectorClock(", ", ", ")")
}

View file

@ -0,0 +1,122 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable
import scala.language.postfixOps
import scala.concurrent.duration._
import akka.actor.Address
import akka.cluster.MemberStatus._
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.event.Logging.Info
import akka.actor.Actor
import akka.actor.Props
object NodeChurnMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster.auto-down-unreachable-after = 1s
akka.remote.log-frame-size-exceeding = 2000b
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
class LogListener(testActor: ActorRef) extends Actor {
def receive = {
case Info(_, _, msg: String) if msg.startsWith("New maximum payload size for [akka.cluster.GossipEnvelope]")
testActor ! msg
case _
}
}
}
class NodeChurnMultiJvmNode1 extends NodeChurnSpec
class NodeChurnMultiJvmNode2 extends NodeChurnSpec
class NodeChurnMultiJvmNode3 extends NodeChurnSpec
abstract class NodeChurnSpec
extends MultiNodeSpec(NodeChurnMultiJvmSpec)
with MultiNodeClusterSpec with ImplicitSender {
import NodeChurnMultiJvmSpec._
def seedNodes: immutable.IndexedSeq[Address] = Vector(first, second, third)
override def afterAll(): Unit = {
super.afterAll()
}
val rounds = 3
override def expectedTestDuration: FiniteDuration = 45.seconds * rounds
def awaitAllMembersUp(additionaSystems: Vector[ActorSystem]): Unit = {
val numberOfMembers = roles.size + roles.size * additionaSystems.size
awaitMembersUp(numberOfMembers)
awaitAssert {
additionaSystems.foreach { s
val c = Cluster(s)
c.state.members.size should be(numberOfMembers)
c.state.members.forall(_.status == MemberStatus.Up)
}
}
}
def awaitRemoved(additionaSystems: Vector[ActorSystem]): Unit = {
awaitMembersUp(roles.size, timeout = 40.seconds)
awaitAssert {
additionaSystems.foreach { s
Cluster(s).isTerminated should be(true)
}
}
}
"Cluster with short lived members" must {
"setup stable nodes" taggedAs LongRunningTest in within(15.seconds) {
val logListener = system.actorOf(Props(classOf[LogListener], testActor), "logListener")
system.eventStream.subscribe(logListener, classOf[Info])
cluster.joinSeedNodes(seedNodes)
awaitMembersUp(roles.size)
}
"join and remove transient nodes without growing gossip payload" taggedAs LongRunningTest in {
// This test is configured with log-frame-size-exceeding and the LogListener
// will send to the testActor if unexpected increase in message payload size.
// It will fail after a while if vector clock entries of removed nodes are not pruned.
for (n 1 to rounds) {
log.info("round-" + n)
val systems = Vector.fill(5)(ActorSystem(system.name, system.settings.config))
systems.foreach { s
muteDeadLetters()(s)
Cluster(s).joinSeedNodes(seedNodes)
}
awaitAllMembersUp(systems)
enterBarrier("members-up-" + n)
systems.foreach { node
if (n % 2 == 0)
Cluster(node).down(Cluster(node).selfAddress)
else
Cluster(node).leave(Cluster(node).selfAddress)
}
awaitRemoved(systems)
enterBarrier("members-removed-" + n)
systems.foreach(_.terminate().await)
log.info("end of round-" + n)
// log listener will send to testActor if payload size exceed configured log-frame-size-exceeding
expectNoMsg(2.seconds)
}
expectNoMsg(5.seconds)
}
}
}

View file

@ -54,7 +54,7 @@ class VectorClockSpec extends AkkaSpec {
}
"pass misc comparison test 3" in {
var clock1_1 = VectorClock()
val clock1_1 = VectorClock()
val clock2_1 = clock1_1 :+ Node("1")
val clock1_2 = VectorClock()
@ -239,12 +239,34 @@ class VectorClockSpec extends AkkaSpec {
val a1 = a :+ node1
val b1 = b :+ node2
var a2 = a1 :+ node1
var c = a2.merge(b1)
var c1 = c :+ node3
val a2 = a1 :+ node1
val c = a2.merge(b1)
val c1 = c :+ node3
(c1 > a2) should ===(true)
(c1 > b1) should ===(true)
}
"support pruning" in {
val node1 = Node("1")
val node2 = Node("2")
val node3 = Node("3")
val a = VectorClock()
val b = VectorClock()
val a1 = a :+ node1
val b1 = b :+ node2
val c = a1.merge(b1)
val c1 = c.prune(node1) :+ node3
c1.versions.contains(node1) should be(false)
(c1 <> c) should be(true)
(c.prune(node1) merge c1).versions.contains(node1) should be(false)
val c2 = c :+ node2
(c1 <> c2) should be(true)
}
}
}