diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 69fd6b0e75..bed87d4a93 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -330,6 +330,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { */ def registerOnMemberUp(callback: Runnable): Unit = clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback) + /** * The supplied thunk will be run, once, when current cluster member is `Removed`. * and if the cluster have been shutdown,that thunk will run on the caller thread immediately. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 77e5acd656..8a84e7644c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -510,6 +510,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with else { logInfo("Welcome from [{}]", from.address) latestGossip = gossip seen selfUniqueAddress + assertLatestGossip() publish(latestGossip) if (from != selfUniqueAddress) gossipTo(from, sender()) @@ -657,10 +658,30 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), Newer) case _ ⇒ // conflicting versions, merge - (remoteGossip merge localGossip, true, Merge) + // We can see that a removal was done when it is not in one of the gossips has status + // Down or Exiting in the other gossip. + // Perform the same pruning (clear of VectorClock) as the leader did when removing a member. + // Removal of member itself is handled in merge (pickHighestPriority) + val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m) ⇒ + if (Gossip.removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) { + log.debug("Cluster Node [{}] - Pruned conflicting local gossip: {}", selfAddress, m) + g.prune(VectorClock.Node(vclockName(m.uniqueAddress))) + } else + g + } + val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m) ⇒ + if (Gossip.removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) { + log.debug("Cluster Node [{}] - Pruned conflicting remote gossip: {}", selfAddress, m) + g.prune(VectorClock.Node(vclockName(m.uniqueAddress))) + } else + g + } + + (prunedRemoteGossip merge prunedLocalGossip, true, Merge) } latestGossip = winningGossip seen selfUniqueAddress + assertLatestGossip() // for all new joining nodes we remove them from the failure detector latestGossip.members foreach { @@ -886,7 +907,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // removing REMOVED nodes from the `reachability` table val newReachability = localOverview.reachability.remove(removed) val newOverview = localOverview copy (seen = newSeen, reachability = newReachability) - val newGossip = localGossip copy (members = newMembers, overview = newOverview) + // Clear the VectorClock when member is removed. The change made by the leader is stamped + // and will propagate as is if there are no other changes on other nodes. + // If other concurrent changes on other nodes (e.g. join) the pruning is also + // taken care of when receiving gossips. + val newVersion = removed.foldLeft(localGossip.version) { (v, node) ⇒ + v.prune(VectorClock.Node(vclockName(node))) + } + val newGossip = localGossip copy (members = newMembers, overview = newOverview, version = newVersion) updateLatestGossip(newGossip) @@ -1016,8 +1044,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress) // Update the state with the new gossip latestGossip = seenVersionedGossip + assertLatestGossip() } + def assertLatestGossip(): Unit = + if (Cluster.isAssertInvariantsEnabled && latestGossip.version.versions.size > latestGossip.members.size) + throw new IllegalStateException(s"Too many vector clock entries in gossip state ${latestGossip}") + def publish(newGossip: Gossip): Unit = { publisher ! PublishChanges(newGossip) if (PublishStatsInterval == Duration.Zero) publishInternalStats() diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index d7deef059a..c130b1d442 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -213,6 +213,12 @@ private[cluster] final case class Gossip( members.maxBy(m ⇒ if (m.upNumber == Int.MaxValue) 0 else m.upNumber) } + def prune(removedNode: VectorClock.Node): Gossip = { + val newVersion = version.prune(removedNode) + if (newVersion eq version) this + else copy(version = newVersion) + } + override def toString = s"Gossip(members = [${members.mkString(", ")}], overview = ${overview}, version = ${version})" } diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala index 2a4a570bbc..7f30d1bf9b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala +++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala @@ -187,5 +187,11 @@ final case class VectorClock( VectorClock(mergedVersions) } + def prune(removedNode: Node): VectorClock = + if (versions.contains(removedNode)) + copy(versions = versions - removedNode) + else + this + override def toString = versions.map { case ((n, t)) ⇒ n + " -> " + t }.mkString("VectorClock(", ", ", ")") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala new file mode 100644 index 0000000000..2eb7d31eba --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeChurnSpec.scala @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable +import scala.language.postfixOps +import scala.concurrent.duration._ +import akka.actor.Address +import akka.cluster.MemberStatus._ +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.actor.ActorSystem +import akka.actor.ActorRef +import akka.event.Logging.Info +import akka.actor.Actor +import akka.actor.Props + +object NodeChurnMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down-unreachable-after = 1s + akka.remote.log-frame-size-exceeding = 2000b + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + + class LogListener(testActor: ActorRef) extends Actor { + def receive = { + case Info(_, _, msg: String) if msg.startsWith("New maximum payload size for [akka.cluster.GossipEnvelope]") ⇒ + testActor ! msg + case _ ⇒ + } + } +} + +class NodeChurnMultiJvmNode1 extends NodeChurnSpec +class NodeChurnMultiJvmNode2 extends NodeChurnSpec +class NodeChurnMultiJvmNode3 extends NodeChurnSpec + +abstract class NodeChurnSpec + extends MultiNodeSpec(NodeChurnMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender { + + import NodeChurnMultiJvmSpec._ + + def seedNodes: immutable.IndexedSeq[Address] = Vector(first, second, third) + + override def afterAll(): Unit = { + super.afterAll() + } + + val rounds = 3 + + override def expectedTestDuration: FiniteDuration = 45.seconds * rounds + + def awaitAllMembersUp(additionaSystems: Vector[ActorSystem]): Unit = { + val numberOfMembers = roles.size + roles.size * additionaSystems.size + awaitMembersUp(numberOfMembers) + awaitAssert { + additionaSystems.foreach { s ⇒ + val c = Cluster(s) + c.state.members.size should be(numberOfMembers) + c.state.members.forall(_.status == MemberStatus.Up) + } + } + } + + def awaitRemoved(additionaSystems: Vector[ActorSystem]): Unit = { + awaitMembersUp(roles.size, timeout = 40.seconds) + awaitAssert { + additionaSystems.foreach { s ⇒ + Cluster(s).isTerminated should be(true) + } + } + } + + "Cluster with short lived members" must { + "setup stable nodes" taggedAs LongRunningTest in within(15.seconds) { + val logListener = system.actorOf(Props(classOf[LogListener], testActor), "logListener") + system.eventStream.subscribe(logListener, classOf[Info]) + cluster.joinSeedNodes(seedNodes) + awaitMembersUp(roles.size) + } + + "join and remove transient nodes without growing gossip payload" taggedAs LongRunningTest in { + // This test is configured with log-frame-size-exceeding and the LogListener + // will send to the testActor if unexpected increase in message payload size. + // It will fail after a while if vector clock entries of removed nodes are not pruned. + for (n ← 1 to rounds) { + log.info("round-" + n) + val systems = Vector.fill(5)(ActorSystem(system.name, system.settings.config)) + systems.foreach { s ⇒ + muteDeadLetters()(s) + Cluster(s).joinSeedNodes(seedNodes) + } + awaitAllMembersUp(systems) + enterBarrier("members-up-" + n) + systems.foreach { node ⇒ + if (n % 2 == 0) + Cluster(node).down(Cluster(node).selfAddress) + else + Cluster(node).leave(Cluster(node).selfAddress) + } + awaitRemoved(systems) + enterBarrier("members-removed-" + n) + systems.foreach(_.terminate().await) + log.info("end of round-" + n) + // log listener will send to testActor if payload size exceed configured log-frame-size-exceeding + expectNoMsg(2.seconds) + } + expectNoMsg(5.seconds) + } + + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala index ba31c98f68..6dc396e76a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala @@ -54,7 +54,7 @@ class VectorClockSpec extends AkkaSpec { } "pass misc comparison test 3" in { - var clock1_1 = VectorClock() + val clock1_1 = VectorClock() val clock2_1 = clock1_1 :+ Node("1") val clock1_2 = VectorClock() @@ -239,12 +239,34 @@ class VectorClockSpec extends AkkaSpec { val a1 = a :+ node1 val b1 = b :+ node2 - var a2 = a1 :+ node1 - var c = a2.merge(b1) - var c1 = c :+ node3 + val a2 = a1 :+ node1 + val c = a2.merge(b1) + val c1 = c :+ node3 (c1 > a2) should ===(true) (c1 > b1) should ===(true) } + + "support pruning" in { + val node1 = Node("1") + val node2 = Node("2") + val node3 = Node("3") + + val a = VectorClock() + val b = VectorClock() + + val a1 = a :+ node1 + val b1 = b :+ node2 + + val c = a1.merge(b1) + val c1 = c.prune(node1) :+ node3 + c1.versions.contains(node1) should be(false) + (c1 <> c) should be(true) + + (c.prune(node1) merge c1).versions.contains(node1) should be(false) + + val c2 = c :+ node2 + (c1 <> c2) should be(true) + } } }