diff --git a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala index 2a5c9585b3..6f187be058 100644 --- a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala @@ -179,7 +179,7 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg /** Idempotent, become active if this node is n-th oldest and should monitor other nodes */ private def becomeActiveIfResponsibleForHeartbeat(): Unit = { if (!activelyMonitoring && selfIsResponsibleForCrossDcHeartbeat()) { - if (verboseHeartbeat) log.debug("Becoming ACTIVE (for DC: {}), monitoring other DCs oldest nodes", selfDataCenter) + log.info("Cross DC heartbeat becoming ACTIVE on this node (for DC: {}), monitoring other DCs oldest nodes", selfDataCenter) activelyMonitoring = true context.become(active orElse introspecting) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala new file mode 100644 index 0000000000..71fab2632c --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala @@ -0,0 +1,194 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.cluster + +import akka.actor.ActorSelection +import akka.annotation.InternalApi +import akka.remote.testconductor.RoleName +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +import scala.collection.immutable +import scala.collection.immutable.SortedSet +import scala.concurrent.duration._ + +object MultiDcHeartbeatTakingOverSpecMultiJvmSpec extends MultiNodeConfig { + val first = role("first") // alpha + val second = role("second") // alpha + val third = role("third") // alpha + + val fourth = role("fourth") // beta + val fifth = role("fifth") // beta + + nodeConfig(first, second, third)(ConfigFactory.parseString( + """ + akka { + cluster.multi-data-center.self-data-center = alpha + } + """)) + + nodeConfig(fourth, fifth)(ConfigFactory.parseString( + """ + akka { + cluster.multi-data-center.self-data-center = beta + } + """)) + + commonConfig(ConfigFactory.parseString( + """ + akka { + actor.provider = cluster + + loggers = ["akka.testkit.TestEventListener"] + loglevel = INFO + + remote.log-remote-lifecycle-events = off + + cluster { + debug.verbose-heartbeat-logging = off + + multi-data-center { + cross-data-center-connections = 2 + } + } + } + """)) + +} + +class MultiDcHeartbeatTakingOverSpecMultiJvmNode1 extends MultiDcHeartbeatTakingOverSpec +class MultiDcHeartbeatTakingOverSpecMultiJvmNode2 extends MultiDcHeartbeatTakingOverSpec +class MultiDcHeartbeatTakingOverSpecMultiJvmNode3 extends MultiDcHeartbeatTakingOverSpec +class MultiDcHeartbeatTakingOverSpecMultiJvmNode4 extends MultiDcHeartbeatTakingOverSpec +class MultiDcHeartbeatTakingOverSpecMultiJvmNode5 extends MultiDcHeartbeatTakingOverSpec + +abstract class MultiDcHeartbeatTakingOverSpec extends MultiNodeSpec(MultiDcHeartbeatTakingOverSpecMultiJvmSpec) + with MultiNodeClusterSpec { + + "A 2-dc cluster" must { + + val observer: TestProbe = TestProbe("alpha-observer") + + val crossDcHeartbeatSenderPath = "/system/cluster/core/daemon/crossDcHeartbeatSender" + val selectCrossDcHeartbeatSender: ActorSelection = system.actorSelection(crossDcHeartbeatSenderPath) + + // these will be filled in during the initial phase of the test ----------- + var expectedAlphaHeartbeaterNodes: SortedSet[Member] = SortedSet.empty + var expectedAlphaHeartbeaterRoles: SortedSet[RoleName] = SortedSet.empty + + var expectedBetaHeartbeaterNodes: SortedSet[Member] = SortedSet.empty + var expectedBetaHeartbeaterRoles: SortedSet[RoleName] = SortedSet.empty + + var expectedNoActiveHeartbeatSenderRoles: Set[RoleName] = Set.empty + // end of these will be filled in during the initial phase of the test ----------- + + def refreshOldestMemberHeartbeatStatuses() = { + expectedAlphaHeartbeaterNodes = takeNOldestMembers(_.dataCenter == "alpha", 2) + expectedAlphaHeartbeaterRoles = membersAsRoles(expectedAlphaHeartbeaterNodes) + + expectedBetaHeartbeaterNodes = takeNOldestMembers(_.dataCenter == "beta", 2) + expectedBetaHeartbeaterRoles = membersAsRoles(expectedBetaHeartbeaterNodes) + + expectedNoActiveHeartbeatSenderRoles = roles.toSet -- (expectedAlphaHeartbeaterRoles union expectedBetaHeartbeaterRoles) + } + + "collect information on oldest nodes" taggedAs LongRunningTest in { + // allow all nodes to join: + awaitClusterUp(roles: _*) + + refreshOldestMemberHeartbeatStatuses() + info(s"expectedAlphaHeartbeaterNodes = ${expectedAlphaHeartbeaterNodes.map(_.address.port.get)}") + info(s"expectedBetaHeartbeaterNodes = ${expectedBetaHeartbeaterNodes.map(_.address.port.get)}") + info(s"expectedNoActiveHeartbeatSenderRoles = ${expectedNoActiveHeartbeatSenderRoles.map(_.port.get)}") + + expectedAlphaHeartbeaterRoles.size should ===(2) + expectedBetaHeartbeaterRoles.size should ===(2) + + enterBarrier("found-expectations") + } + + "be healthy" taggedAs LongRunningTest in { + implicit val sender = observer.ref + runOn(expectedAlphaHeartbeaterRoles.toList: _*) { + selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus() + observer.expectMsgType[CrossDcHeartbeatSender.MonitoringActive](5.seconds) + } + runOn(expectedBetaHeartbeaterRoles.toList: _*) { + selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus() + observer.expectMsgType[CrossDcHeartbeatSender.MonitoringActive](5.seconds) + } + runOn(expectedNoActiveHeartbeatSenderRoles.toList: _*) { + selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus() + observer.expectMsgType[CrossDcHeartbeatSender.MonitoringDormant](5.seconds) + } + + enterBarrier("sunny-weather-done") + } + + "other node must become oldest when current DC-oldest Leaves" taggedAs LongRunningTest in { + val observer = TestProbe("alpha-observer-prime") + + // we leave one of the current oldest nodes of the `alpha` DC, + // since it has 3 members the "not yet oldest" one becomes oldest and should start monitoring across datacenter + val preLeaveOldestAlphaRole = expectedAlphaHeartbeaterRoles.head + val preLeaveOldestAlphaAddress = expectedAlphaHeartbeaterNodes.find(_.address.port.get == preLeaveOldestAlphaRole.port.get).get.address + runOn(preLeaveOldestAlphaRole) { + info(s"Leaving: ${preLeaveOldestAlphaAddress}") + cluster.leave(cluster.selfAddress) + } + + awaitMemberRemoved(preLeaveOldestAlphaAddress) + enterBarrier("wat") + + // refresh our view about who is currently monitoring things in alpha: + refreshOldestMemberHeartbeatStatuses() + + enterBarrier("after-alpha-monitoring-node-left") + + implicit val sender = observer.ref + val expectedAlphaMonitoringNodesAfterLeaving = (takeNOldestMembers(_.dataCenter == "alpha", 3).filterNot(_.status == MemberStatus.Exiting)) + runOn(membersAsRoles(expectedAlphaMonitoringNodesAfterLeaving).toList: _*) { + awaitAssert({ + + selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus() + + try { + observer.expectMsgType[CrossDcHeartbeatSender.MonitoringActive](5.seconds) + info(s"Got confirmation from ${observer.lastSender} that it is actively monitoring now") + } catch { + case ex: Throwable ⇒ + throw new AssertionError(s"Monitoring was Dormant on ${cluster.selfAddress}, where we expected it to be active!", ex) + } + }, 20.seconds) + } + enterBarrier("confirmed-heartbeating-take-over") + } + } + + /** + * INTERNAL API + * Returns `Up` (or in "later" status, like Leaving etc, but never `Joining` or `WeaklyUp`) members, + * sorted by Member.ageOrdering (from oldest to youngest). This restriction on status is needed to + * strongly guaratnee the order of "oldest" members, as they're linearized by the order in which they become Up + * (since marking that transition is a Leader action). + */ + private def membersByAge(): immutable.SortedSet[Member] = + SortedSet.empty(Member.ageOrdering) + .union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.WeaklyUp)) + + /** INTERNAL API */ + @InternalApi + private[cluster] def takeNOldestMembers(memberFilter: Member ⇒ Boolean, n: Int): immutable.SortedSet[Member] = + membersByAge() + .filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp) + .filter(memberFilter) + .take(n) + + private def membersAsRoles(ms: SortedSet[Member]): SortedSet[RoleName] = { + val res = ms.flatMap(m ⇒ roleName(m.address)) + require(res.size == ms.size, s"Not all members were converted to roles! Got: ${ms}, found ${res}") + res + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index fa461bc769..d83c159d58 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -14,7 +14,7 @@ import akka.remote.testconductor.RoleName import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec, STMultiNodeSpec } import akka.testkit._ import akka.testkit.TestEvent._ -import akka.actor.{ ActorSystem, Address } +import akka.actor.{ Actor, ActorRef, ActorSystem, Address, Deploy, PoisonPill, Props, RootActorPath } import akka.event.Logging.ErrorLevel import scala.concurrent.duration._ @@ -22,9 +22,9 @@ import scala.collection.immutable import java.util.concurrent.ConcurrentHashMap import akka.remote.DefaultFailureDetectorRegistry -import akka.actor.ActorRef -import akka.actor.Actor -import akka.actor.RootActorPath +import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberEvent, MemberExited, MemberRemoved } + +import scala.concurrent.Await object MultiNodeClusterSpec { @@ -312,6 +312,46 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro } } + def awaitMemberRemoved(toBeRemovedAddress: Address, timeout: FiniteDuration = 25.seconds): Unit = within(timeout) { + if (toBeRemovedAddress == cluster.selfAddress) { + enterBarrier("registered-listener") + + cluster.leave(toBeRemovedAddress) + enterBarrier("member-left") + + awaitCond(cluster.isTerminated, remaining) + enterBarrier("member-shutdown") + } else { + val exitingLatch = TestLatch() + + val awaiter = system.actorOf(Props(new Actor { + def receive = { + case MemberRemoved(m, _) if m.address == toBeRemovedAddress ⇒ + exitingLatch.countDown() + case _ ⇒ + // ignore + } + }).withDeploy(Deploy.local)) + cluster.subscribe(awaiter, classOf[MemberEvent]) + enterBarrier("registered-listener") + + // in the meantime member issues leave + enterBarrier("member-left") + + // verify that the member is EXITING + try Await.result(exitingLatch, timeout) catch { + case cause: Exception ⇒ + throw new AssertionError(s"Member ${toBeRemovedAddress} was not removed within ${timeout}!", cause) + } + awaiter ! PoisonPill // you've done your job, now die + + enterBarrier("member-shutdown") + markNodeAsUnavailable(toBeRemovedAddress) + } + + enterBarrier("member-totally-shutdown") + } + def awaitAllReachable(): Unit = awaitAssert(clusterView.unreachableMembers should ===(Set.empty))