diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 89b0d5f224..1721959ae3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -111,8 +111,12 @@ object ClusterEvent { /** * Member completely removed from the cluster. + * When `previousStatus` is `MemberStatus.Down` the node was removed + * after being detected as unreachable and downed. + * When `previousStatus` is `MemberStatus.Exiting` the node was removed + * after graceful leaving and exiting. */ - case class MemberRemoved(member: Member) extends MemberEvent { + case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent { if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member) } @@ -200,7 +204,7 @@ object ClusterEvent { val removedMembers = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) ++ (oldGossip.overview.unreachable -- newGossip.overview.unreachable) - val removedEvents = removedMembers.map(m ⇒ MemberRemoved(m.copy(status = Removed))) + val removedEvents = removedMembers.map(m ⇒ MemberRemoved(m.copy(status = Removed), m.status)) (new VectorBuilder[MemberEvent]() ++= memberEvents ++= removedEvents).result() } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 6b7d201065..04deb749a8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -126,7 +126,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg case HeartbeatTick ⇒ heartbeat() case MemberUp(m) ⇒ addMember(m) case UnreachableMember(m) ⇒ removeMember(m) - case MemberRemoved(m) ⇒ removeMember(m) + case MemberRemoved(m, _) ⇒ removeMember(m) case s: CurrentClusterState ⇒ reset(s) case MemberExited(m) ⇒ memberExited(m) case _: MemberEvent ⇒ // not interested in other types of MemberEvent diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 4a714bb42b..a666886c47 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -87,7 +87,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg) case state: CurrentClusterState ⇒ receiveState(state) case MemberUp(m) ⇒ addMember(m) - case MemberRemoved(m) ⇒ removeMember(m) + case MemberRemoved(m, _) ⇒ removeMember(m) case MemberExited(m) ⇒ removeMember(m) case UnreachableMember(m) ⇒ removeMember(m) case _: MemberEvent ⇒ // not interested in other types of MemberEvent diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 0b7d7442a6..6998fbb6c8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -49,7 +49,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case e: ClusterDomainEvent ⇒ e match { case SeenChanged(convergence, seenBy) ⇒ state = state.copy(seenBy = seenBy) - case MemberRemoved(member) ⇒ + case MemberRemoved(member, _) ⇒ state = state.copy(members = state.members - member, unreachable = state.unreachable - member) case UnreachableMember(member) ⇒ // replace current member with new member (might have different status, only address is used in equals) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index ae78946ac7..c3dde99738 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -11,7 +11,6 @@ import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberRemoved -import akka.cluster.ClusterEvent.UnreachableMember import akka.remote.FailureDetectorRegistry import akka.remote.RemoteWatcher @@ -63,7 +62,6 @@ private[cluster] class ClusterRemoteWatcher( override def preStart(): Unit = { super.preStart() cluster.subscribe(self, classOf[MemberEvent]) - cluster.subscribe(self, classOf[UnreachableMember]) } override def postStop(): Unit = { @@ -79,24 +77,18 @@ private[cluster] class ClusterRemoteWatcher( case state: CurrentClusterState ⇒ clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address } clusterNodes foreach takeOverResponsibility - val clusterUnreachable = state.unreachable.collect { case m if m.address != selfAddress ⇒ m.address } unreachable --= clusterNodes - unreachable ++= clusterUnreachable case MemberUp(m) ⇒ if (m.address != selfAddress) { clusterNodes += m.address takeOverResponsibility(m.address) unreachable -= m.address } - case UnreachableMember(m) ⇒ - if (m.address != selfAddress) - unreachable += m.address - case MemberRemoved(m) ⇒ + case MemberRemoved(m, previousStatus) ⇒ if (m.address != selfAddress) { clusterNodes -= m.address - if (unreachable contains m.address) { + if (previousStatus == MemberStatus.Down) { quarantine(m.address, m.uniqueAddress.uid) - unreachable -= m.address } publishAddressTerminated(m.address) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index f2b101c89d..6c5657cece 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -54,7 +54,7 @@ abstract class MembershipChangeListenerExitingSpec exitingLatch.countDown() case MemberExited(m) if m.address == secondAddress ⇒ exitingLatch.countDown() - case MemberRemoved(m) if m.address == secondAddress ⇒ + case MemberRemoved(m, Exiting) if m.address == secondAddress ⇒ removedLatch.countDown() case _ ⇒ // ignore } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index a2eec0867c..d546c8f808 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -48,7 +48,7 @@ abstract class NodeLeavingAndExitingSpec if (state.members.exists(m ⇒ m.address == secondAddess && m.status == Exiting)) exitingLatch.countDown() case MemberExited(m) if m.address == secondAddess ⇒ exitingLatch.countDown() - case MemberRemoved(m) ⇒ // not tested here + case _: MemberRemoved ⇒ // not tested here } })), classOf[MemberEvent]) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 8e9fd04b0e..eae4f5da45 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -89,9 +89,9 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec memberSubscriber.expectNoMsg(1 second) // at the removed member a an empty gossip is the last thing publisher ! PublishChanges(Gossip.empty) - memberSubscriber.expectMsg(MemberRemoved(aRemoved)) - memberSubscriber.expectMsg(MemberRemoved(bRemoved)) - memberSubscriber.expectMsg(MemberRemoved(cRemoved)) + memberSubscriber.expectMsg(MemberRemoved(aRemoved, Exiting)) + memberSubscriber.expectMsg(MemberRemoved(bRemoved, Exiting)) + memberSubscriber.expectMsg(MemberRemoved(cRemoved, Up)) memberSubscriber.expectMsg(LeaderChanged(None)) } @@ -150,7 +150,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "publish Removed when stopped" in { publisher ! PoisonPill - memberSubscriber.expectMsg(MemberRemoved(aRemoved)) + memberSubscriber.expectMsg(MemberRemoved(aRemoved, Up)) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index cba8900869..bac45879c2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -73,10 +73,10 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { } "be produced for removed members" in { - val (g1, _) = converge(Gossip(members = SortedSet(aUp, dLeaving))) + val (g1, _) = converge(Gossip(members = SortedSet(aUp, dExiting))) val (g2, s2) = converge(Gossip(members = SortedSet(aUp))) - diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(dRemoved))) + diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(dRemoved, Exiting))) diffUnreachable(g1, g2) must be(Seq.empty) diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } @@ -97,7 +97,7 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val (g1, _) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining))) val (g2, s2) = converge(Gossip(members = SortedSet(bUp, eJoining))) - diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(aRemoved))) + diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(aRemoved, Up))) diffUnreachable(g1, g2) must be(Seq.empty) diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) diffLeader(g1, g2) must be(Seq(LeaderChanged(Some(bUp.address)))) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala index c1c3dbbcf6..0c9829b988 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala @@ -453,7 +453,7 @@ class ClusterReceptionist( consistentHash = ConsistentHash(nodes, virtualNodesFactor) } - case MemberRemoved(m) ⇒ + case MemberRemoved(m, _) ⇒ if (m.address == selfAddress) context stop self else if (matchingRole(m)) { diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index 2a2d7b95b1..d179342cda 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -489,13 +489,13 @@ class ClusterSingletonManager( stay using YoungerData(oldestOption) } - case Event(MemberRemoved(m), YoungerData(Some(previousOldest))) if m.address == previousOldest ⇒ + case Event(MemberRemoved(m, _), YoungerData(Some(previousOldest))) if m.address == previousOldest ⇒ logInfo("Previous oldest removed [{}]", m.address) addRemoved(m.address) // transition when OldestChanged stay using YoungerData(None) - case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress ⇒ + case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() @@ -518,7 +518,7 @@ class ClusterSingletonManager( stay } - case Event(MemberRemoved(m), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest ⇒ + case Event(MemberRemoved(m, _), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest ⇒ logInfo("Previous oldest [{}] removed", previousOldest) addRemoved(m.address) stay @@ -600,7 +600,7 @@ class ClusterSingletonManager( case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, handOverData, _)) ⇒ gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) - case Event(MemberRemoved(m), WasOldestData(singleton, singletonTerminated, handOverData, Some(newOldest))) if !selfExited && m.address == newOldest ⇒ + case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, handOverData, Some(newOldest))) if !selfExited && m.address == newOldest ⇒ addRemoved(m.address) gotoHandingOver(singleton, singletonTerminated, handOverData, None) @@ -647,7 +647,7 @@ class ClusterSingletonManager( } when(End) { - case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress ⇒ + case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() } @@ -660,7 +660,7 @@ class ClusterSingletonManager( logInfo("Exited [{}]", m.address) } stay - case Event(MemberRemoved(m), _) ⇒ + case Event(MemberRemoved(m, _), _) ⇒ if (!selfExited) logInfo("Member removed [{}]", m.address) addRemoved(m.address) stay diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index b3b23a8e66..51811fd934 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -342,7 +342,7 @@ class DistributedPubSubMediator( if (matchingRole(m)) nodes += m.address - case MemberRemoved(m) ⇒ + case MemberRemoved(m, _) ⇒ if (m.address == selfAddress) context stop self else if (matchingRole(m)) { diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index c2c8fdd993..9b2687bc34 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -164,9 +164,9 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect { case m if m.hasRole(role) ⇒ m } - case MemberUp(m) ⇒ if (m.hasRole(role)) membersByAge += m - case MemberRemoved(m) ⇒ if (m.hasRole(role)) membersByAge -= m - case other ⇒ consumer foreach { _.tell(other, sender) } + case MemberUp(m) ⇒ if (m.hasRole(role)) membersByAge += m + case MemberRemoved(m, _) ⇒ if (m.hasRole(role)) membersByAge -= m + case other ⇒ consumer foreach { _.tell(other, sender) } } def consumer: Option[ActorSelection] = diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala index a6e905b00c..397ffaf3b6 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala @@ -23,13 +23,14 @@ object SimpleClusterApp { class SimpleClusterListener extends Actor with ActorLogging { def receive = { case state: CurrentClusterState ⇒ - log.info("Current members: {}", state.members) + log.info("Current members: {}", state.members.mkString(", ")) case MemberUp(member) ⇒ - log.info("Member is Up: {}", member) + log.info("Member is Up: {}", member.address) case UnreachableMember(member) ⇒ log.info("Member detected as unreachable: {}", member) - case MemberRemoved(member) ⇒ - log.info("Member is Removed: {}", member) + case MemberRemoved(member, previousStatus) ⇒ + log.info("Member is Removed: {} after {}", + member.address, previousStatus) case _: ClusterDomainEvent ⇒ // ignore } } \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index bfa8cbae09..ab3df215f8 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -111,9 +111,9 @@ class StatsFacade extends Actor with ActorLogging { membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect { case m if m.hasRole("compute") ⇒ m } - case MemberUp(m) ⇒ if (m.hasRole("compute")) membersByAge += m - case MemberRemoved(m) ⇒ if (m.hasRole("compute")) membersByAge -= m - case _: MemberEvent ⇒ // not interesting + case MemberUp(m) ⇒ if (m.hasRole("compute")) membersByAge += m + case MemberRemoved(m, _) ⇒ if (m.hasRole("compute")) membersByAge -= m + case _: MemberEvent ⇒ // not interesting } def currentMaster: ActorSelection =