From 079aa467331a8b046e9da2275cfe2e5a9f8810ad Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Mon, 5 Nov 2018 11:03:06 +0100 Subject: [PATCH] Introduce 'MemberDowned' member event (#25854) * Introduce 'MemberDowned' member event Compatiblity note: MemberEvent is a sealed trait, so it is debatable whether it is acceptable to introduce a new member. * Be more conservative (more like leaving), add test --- .../akka/cluster/pubsub/DistributedPubSubMediator.scala | 6 ++++++ .../akka/cluster/typed/BasicClusterExampleSpec.scala | 1 + .../src/main/scala/akka/cluster/ClusterEvent.scala | 9 +++++++++ .../scala/akka/cluster/CoordinatedShutdownLeave.scala | 6 +++++- .../test/scala/akka/cluster/ClusterDomainEventSpec.scala | 8 ++++++++ .../src/test/scala/akka/cluster/ClusterSpec.scala | 1 + 6 files changed, 30 insertions(+), 1 deletion(-) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala index 414a59d1db..2e308d4990 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala @@ -707,6 +707,12 @@ class DistributedPubSubMediator(settings: DistributedPubSubSettings) extends Act registry -= m.address } + case MemberDowned(m) ⇒ + if (matchingRole(m)) { + nodes -= m.address + registry -= m.address + } + case MemberRemoved(m, _) ⇒ if (m.address == selfAddress) context stop self diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala index 1d95cc947a..de4569610f 100644 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala @@ -212,6 +212,7 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually system1.log.info("Downing node 3") cluster1.manager ! Down(cluster3.selfMember.address) + probe1.expectMessageType[MemberDowned].member.address shouldEqual cluster3.selfMember.address probe1.expectMessageType[MemberRemoved](10.seconds).member.address shouldEqual cluster3.selfMember.address probe1.expectNoMessage() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 7878915c63..d12f129889 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -263,6 +263,14 @@ object ClusterEvent { if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) } + /** + * Member status changed to `MemberStatus.Down` and will be removed + * when all members have seen the `Down` status. + */ + final case class MemberDowned(member: Member) extends MemberEvent { + if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member) + } + /** * Member completely removed from the cluster. * When `previousStatus` is `MemberStatus.Down` the node was removed @@ -449,6 +457,7 @@ object ClusterEvent { case m if m.status == Up ⇒ MemberUp(m) case m if m.status == Leaving ⇒ MemberLeft(m) case m if m.status == Exiting ⇒ MemberExited(m) + case m if m.status == Down ⇒ MemberDowned(m) // no events for other transitions } diff --git a/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala b/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala index 71154fab8b..200e7ebafb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala +++ b/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala @@ -52,10 +52,14 @@ private[akka] class CoordinatedShutdownLeave extends Actor { case MemberLeft(m) ⇒ if (m.uniqueAddress == cluster.selfUniqueAddress) done(replyTo) - case MemberRemoved(m, _) ⇒ + case MemberDowned(m) ⇒ // in case it was downed instead if (m.uniqueAddress == cluster.selfUniqueAddress) done(replyTo) + case MemberRemoved(m, _) ⇒ + // final safety fallback + if (m.uniqueAddress == cluster.selfUniqueAddress) + done(replyTo) } private def done(replyTo: ActorRef): Unit = { diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index e71299892d..ed7f05f34a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -171,6 +171,14 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { state(g2, bUp.uniqueAddress)) should ===(Seq()) } + "be produced for downed members" in { + val (g1, _) = converge(Gossip(members = SortedSet(aUp, eUp))) + val (g2, _) = converge(Gossip(members = SortedSet(aUp, eDown))) + + diffMemberEvents(state(g1), state(g2)) should ===(Seq(MemberDowned(eDown))) + diffUnreachable(state(g1), state(g2)) should ===(Seq.empty) + } + "be produced for removed members" in { val (g1, _) = converge(Gossip(members = SortedSet(aUp, dExiting))) val (g2, s2) = converge(Gossip(members = SortedSet(aUp))) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 6f653d74bd..e04887641a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -244,6 +244,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { probe.expectMsgType[MemberUp] Cluster(sys3).down(Cluster(sys3).selfAddress) + probe.expectMsgType[MemberDowned] probe.expectMsgType[MemberRemoved] Await.result(sys3.whenTerminated, 10.seconds) Cluster(sys3).isTerminated should ===(true)