Merge pull request #1349 from akka/wip-2594-quarantine-removed-member-patriknw
Only quarantine removed member that was unreachable, see #2594
This commit is contained in:
commit
d04d44b184
1 changed files with 25 additions and 8 deletions
|
|
@ -11,6 +11,7 @@ import akka.cluster.ClusterEvent.CurrentClusterState
|
||||||
import akka.cluster.ClusterEvent.MemberEvent
|
import akka.cluster.ClusterEvent.MemberEvent
|
||||||
import akka.cluster.ClusterEvent.MemberUp
|
import akka.cluster.ClusterEvent.MemberUp
|
||||||
import akka.cluster.ClusterEvent.MemberRemoved
|
import akka.cluster.ClusterEvent.MemberRemoved
|
||||||
|
import akka.cluster.ClusterEvent.UnreachableMember
|
||||||
import akka.remote.FailureDetectorRegistry
|
import akka.remote.FailureDetectorRegistry
|
||||||
import akka.remote.RemoteWatcher
|
import akka.remote.RemoteWatcher
|
||||||
|
|
||||||
|
|
@ -57,16 +58,20 @@ private[cluster] class ClusterRemoteWatcher(
|
||||||
|
|
||||||
import RemoteWatcher._
|
import RemoteWatcher._
|
||||||
|
|
||||||
|
val cluster = Cluster(context.system)
|
||||||
|
import cluster.selfAddress
|
||||||
|
|
||||||
var clusterNodes: Set[Address] = Set.empty
|
var clusterNodes: Set[Address] = Set.empty
|
||||||
|
|
||||||
override def preStart(): Unit = {
|
override def preStart(): Unit = {
|
||||||
super.preStart()
|
super.preStart()
|
||||||
Cluster(context.system).subscribe(self, classOf[MemberEvent])
|
cluster.subscribe(self, classOf[MemberEvent])
|
||||||
|
cluster.subscribe(self, classOf[UnreachableMember])
|
||||||
}
|
}
|
||||||
|
|
||||||
override def postStop(): Unit = {
|
override def postStop(): Unit = {
|
||||||
super.postStop()
|
super.postStop()
|
||||||
Cluster(context.system).unsubscribe(self)
|
cluster.unsubscribe(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def receive = receiveClusterEvent orElse super.receive
|
override def receive = receiveClusterEvent orElse super.receive
|
||||||
|
|
@ -75,16 +80,28 @@ private[cluster] class ClusterRemoteWatcher(
|
||||||
case WatchRemote(watchee, watcher) if clusterNodes(watchee.path.address) ⇒
|
case WatchRemote(watchee, watcher) if clusterNodes(watchee.path.address) ⇒
|
||||||
() // cluster managed node, don't propagate to super
|
() // cluster managed node, don't propagate to super
|
||||||
case state: CurrentClusterState ⇒
|
case state: CurrentClusterState ⇒
|
||||||
clusterNodes = state.members.map(_.address)
|
clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address }
|
||||||
clusterNodes foreach takeOverResponsibility
|
clusterNodes foreach takeOverResponsibility
|
||||||
|
unreachable = state.unreachable.collect { case m if m.address != selfAddress ⇒ m.address }
|
||||||
case MemberUp(m) ⇒
|
case MemberUp(m) ⇒
|
||||||
|
if (m.address != selfAddress) {
|
||||||
clusterNodes += m.address
|
clusterNodes += m.address
|
||||||
takeOverResponsibility(m.address)
|
takeOverResponsibility(m.address)
|
||||||
|
unreachable -= m.address
|
||||||
|
}
|
||||||
|
case UnreachableMember(m) ⇒
|
||||||
|
if (m.address != selfAddress)
|
||||||
|
unreachable += m.address
|
||||||
case MemberRemoved(m) ⇒
|
case MemberRemoved(m) ⇒
|
||||||
|
if (m.address != selfAddress) {
|
||||||
clusterNodes -= m.address
|
clusterNodes -= m.address
|
||||||
|
if (unreachable contains m.address) {
|
||||||
quarantine(m.address, m.uniqueAddress.uid)
|
quarantine(m.address, m.uniqueAddress.uid)
|
||||||
|
unreachable -= m.address
|
||||||
|
}
|
||||||
publishAddressTerminated(m.address)
|
publishAddressTerminated(m.address)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When a cluster node is added this class takes over the
|
* When a cluster node is added this class takes over the
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue