Add previousStatus in MemberRemoved, see #3252

This commit is contained in:
Patrik Nordwall 2013-05-23 11:09:32 +02:00
parent f3f55d4972
commit ee6e80d31a
15 changed files with 39 additions and 42 deletions

View file

@ -111,8 +111,12 @@ object ClusterEvent {
/**
* Member completely removed from the cluster.
* When `previousStatus` is `MemberStatus.Down` the node was removed
* after being detected as unreachable and downed.
* When `previousStatus` is `MemberStatus.Exiting` the node was removed
* after graceful leaving and exiting.
*/
case class MemberRemoved(member: Member) extends MemberEvent {
case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent {
if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
}
@ -200,7 +204,7 @@ object ClusterEvent {
val removedMembers = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) ++
(oldGossip.overview.unreachable -- newGossip.overview.unreachable)
val removedEvents = removedMembers.map(m MemberRemoved(m.copy(status = Removed)))
val removedEvents = removedMembers.map(m MemberRemoved(m.copy(status = Removed), m.status))
(new VectorBuilder[MemberEvent]() ++= memberEvents ++= removedEvents).result()
}

View file

@ -126,7 +126,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
case HeartbeatTick heartbeat()
case MemberUp(m) addMember(m)
case UnreachableMember(m) removeMember(m)
case MemberRemoved(m) removeMember(m)
case MemberRemoved(m, _) removeMember(m)
case s: CurrentClusterState reset(s)
case MemberExited(m) memberExited(m)
case _: MemberEvent // not interested in other types of MemberEvent

View file

@ -87,7 +87,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
case msg: MetricsGossipEnvelope receiveGossip(msg)
case state: CurrentClusterState receiveState(state)
case MemberUp(m) addMember(m)
case MemberRemoved(m) removeMember(m)
case MemberRemoved(m, _) removeMember(m)
case MemberExited(m) removeMember(m)
case UnreachableMember(m) removeMember(m)
case _: MemberEvent // not interested in other types of MemberEvent

View file

@ -49,7 +49,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
case e: ClusterDomainEvent e match {
case SeenChanged(convergence, seenBy)
state = state.copy(seenBy = seenBy)
case MemberRemoved(member)
case MemberRemoved(member, _)
state = state.copy(members = state.members - member, unreachable = state.unreachable - member)
case UnreachableMember(member)
// replace current member with new member (might have different status, only address is used in equals)

View file

@ -11,7 +11,6 @@ import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ClusterEvent.UnreachableMember
import akka.remote.FailureDetectorRegistry
import akka.remote.RemoteWatcher
@ -63,7 +62,6 @@ private[cluster] class ClusterRemoteWatcher(
override def preStart(): Unit = {
super.preStart()
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[UnreachableMember])
}
override def postStop(): Unit = {
@ -79,24 +77,18 @@ private[cluster] class ClusterRemoteWatcher(
case state: CurrentClusterState
clusterNodes = state.members.collect { case m if m.address != selfAddress m.address }
clusterNodes foreach takeOverResponsibility
val clusterUnreachable = state.unreachable.collect { case m if m.address != selfAddress m.address }
unreachable --= clusterNodes
unreachable ++= clusterUnreachable
case MemberUp(m)
if (m.address != selfAddress) {
clusterNodes += m.address
takeOverResponsibility(m.address)
unreachable -= m.address
}
case UnreachableMember(m)
if (m.address != selfAddress)
unreachable += m.address
case MemberRemoved(m)
case MemberRemoved(m, previousStatus)
if (m.address != selfAddress) {
clusterNodes -= m.address
if (unreachable contains m.address) {
if (previousStatus == MemberStatus.Down) {
quarantine(m.address, m.uniqueAddress.uid)
unreachable -= m.address
}
publishAddressTerminated(m.address)
}

View file

@ -54,7 +54,7 @@ abstract class MembershipChangeListenerExitingSpec
exitingLatch.countDown()
case MemberExited(m) if m.address == secondAddress
exitingLatch.countDown()
case MemberRemoved(m) if m.address == secondAddress
case MemberRemoved(m, Exiting) if m.address == secondAddress
removedLatch.countDown()
case _ // ignore
}

View file

@ -48,7 +48,7 @@ abstract class NodeLeavingAndExitingSpec
if (state.members.exists(m m.address == secondAddess && m.status == Exiting))
exitingLatch.countDown()
case MemberExited(m) if m.address == secondAddess exitingLatch.countDown()
case MemberRemoved(m) // not tested here
case _: MemberRemoved // not tested here
}
})), classOf[MemberEvent])

View file

@ -89,9 +89,9 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
memberSubscriber.expectNoMsg(1 second)
// at the removed member a an empty gossip is the last thing
publisher ! PublishChanges(Gossip.empty)
memberSubscriber.expectMsg(MemberRemoved(aRemoved))
memberSubscriber.expectMsg(MemberRemoved(bRemoved))
memberSubscriber.expectMsg(MemberRemoved(cRemoved))
memberSubscriber.expectMsg(MemberRemoved(aRemoved, Exiting))
memberSubscriber.expectMsg(MemberRemoved(bRemoved, Exiting))
memberSubscriber.expectMsg(MemberRemoved(cRemoved, Up))
memberSubscriber.expectMsg(LeaderChanged(None))
}
@ -150,7 +150,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"publish Removed when stopped" in {
publisher ! PoisonPill
memberSubscriber.expectMsg(MemberRemoved(aRemoved))
memberSubscriber.expectMsg(MemberRemoved(aRemoved, Up))
}
}

View file

@ -73,10 +73,10 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers {
}
"be produced for removed members" in {
val (g1, _) = converge(Gossip(members = SortedSet(aUp, dLeaving)))
val (g1, _) = converge(Gossip(members = SortedSet(aUp, dExiting)))
val (g2, s2) = converge(Gossip(members = SortedSet(aUp)))
diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(dRemoved)))
diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(dRemoved, Exiting)))
diffUnreachable(g1, g2) must be(Seq.empty)
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
@ -97,7 +97,7 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers {
val (g1, _) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining)))
val (g2, s2) = converge(Gossip(members = SortedSet(bUp, eJoining)))
diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(aRemoved)))
diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(aRemoved, Up)))
diffUnreachable(g1, g2) must be(Seq.empty)
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
diffLeader(g1, g2) must be(Seq(LeaderChanged(Some(bUp.address))))

View file

@ -453,7 +453,7 @@ class ClusterReceptionist(
consistentHash = ConsistentHash(nodes, virtualNodesFactor)
}
case MemberRemoved(m)
case MemberRemoved(m, _)
if (m.address == selfAddress)
context stop self
else if (matchingRole(m)) {

View file

@ -489,13 +489,13 @@ class ClusterSingletonManager(
stay using YoungerData(oldestOption)
}
case Event(MemberRemoved(m), YoungerData(Some(previousOldest))) if m.address == previousOldest
case Event(MemberRemoved(m, _), YoungerData(Some(previousOldest))) if m.address == previousOldest
logInfo("Previous oldest removed [{}]", m.address)
addRemoved(m.address)
// transition when OldestChanged
stay using YoungerData(None)
case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
@ -518,7 +518,7 @@ class ClusterSingletonManager(
stay
}
case Event(MemberRemoved(m), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest
case Event(MemberRemoved(m, _), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest
logInfo("Previous oldest [{}] removed", previousOldest)
addRemoved(m.address)
stay
@ -600,7 +600,7 @@ class ClusterSingletonManager(
case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, handOverData, _))
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender))
case Event(MemberRemoved(m), WasOldestData(singleton, singletonTerminated, handOverData, Some(newOldest))) if !selfExited && m.address == newOldest
case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, handOverData, Some(newOldest))) if !selfExited && m.address == newOldest
addRemoved(m.address)
gotoHandingOver(singleton, singletonTerminated, handOverData, None)
@ -647,7 +647,7 @@ class ClusterSingletonManager(
}
when(End) {
case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
}
@ -660,7 +660,7 @@ class ClusterSingletonManager(
logInfo("Exited [{}]", m.address)
}
stay
case Event(MemberRemoved(m), _)
case Event(MemberRemoved(m, _), _)
if (!selfExited) logInfo("Member removed [{}]", m.address)
addRemoved(m.address)
stay

View file

@ -342,7 +342,7 @@ class DistributedPubSubMediator(
if (matchingRole(m))
nodes += m.address
case MemberRemoved(m)
case MemberRemoved(m, _)
if (m.address == selfAddress)
context stop self
else if (matchingRole(m)) {

View file

@ -164,9 +164,9 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect {
case m if m.hasRole(role) m
}
case MemberUp(m) if (m.hasRole(role)) membersByAge += m
case MemberRemoved(m) if (m.hasRole(role)) membersByAge -= m
case other consumer foreach { _.tell(other, sender) }
case MemberUp(m) if (m.hasRole(role)) membersByAge += m
case MemberRemoved(m, _) if (m.hasRole(role)) membersByAge -= m
case other consumer foreach { _.tell(other, sender) }
}
def consumer: Option[ActorSelection] =

View file

@ -23,13 +23,14 @@ object SimpleClusterApp {
class SimpleClusterListener extends Actor with ActorLogging {
def receive = {
case state: CurrentClusterState
log.info("Current members: {}", state.members)
log.info("Current members: {}", state.members.mkString(", "))
case MemberUp(member)
log.info("Member is Up: {}", member)
log.info("Member is Up: {}", member.address)
case UnreachableMember(member)
log.info("Member detected as unreachable: {}", member)
case MemberRemoved(member)
log.info("Member is Removed: {}", member)
case MemberRemoved(member, previousStatus)
log.info("Member is Removed: {} after {}",
member.address, previousStatus)
case _: ClusterDomainEvent // ignore
}
}

View file

@ -111,9 +111,9 @@ class StatsFacade extends Actor with ActorLogging {
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect {
case m if m.hasRole("compute") m
}
case MemberUp(m) if (m.hasRole("compute")) membersByAge += m
case MemberRemoved(m) if (m.hasRole("compute")) membersByAge -= m
case _: MemberEvent // not interesting
case MemberUp(m) if (m.hasRole("compute")) membersByAge += m
case MemberRemoved(m, _) if (m.hasRole("compute")) membersByAge -= m
case _: MemberEvent // not interesting
}
def currentMaster: ActorSelection =