stop outbound streams when quarantined, #21407
* they can't be stopped immediately because we want to send some final message and we reply to inbound messages with `Quarantined` * and improve logging
This commit is contained in:
parent
11097eedad
commit
1926560e41
14 changed files with 171 additions and 80 deletions
|
|
@ -8,7 +8,7 @@ import scala.collection.immutable
|
|||
import akka.actor.{ ActorLogging, ActorSelection, Address, Actor, RootActorPath }
|
||||
import akka.cluster.ClusterEvent._
|
||||
import akka.remote.FailureDetectorRegistry
|
||||
import akka.remote.PriorityMessage
|
||||
import akka.remote.HeartbeatMessage
|
||||
import akka.actor.DeadLetterSuppression
|
||||
|
||||
/**
|
||||
|
|
@ -36,12 +36,12 @@ private[cluster] object ClusterHeartbeatSender {
|
|||
/**
|
||||
* Sent at regular intervals for failure detection.
|
||||
*/
|
||||
final case class Heartbeat(from: Address) extends ClusterMessage with PriorityMessage with DeadLetterSuppression
|
||||
final case class Heartbeat(from: Address) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
|
||||
|
||||
/**
|
||||
* Sent as reply to [[Heartbeat]] messages.
|
||||
*/
|
||||
final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with PriorityMessage with DeadLetterSuppression
|
||||
final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
|
||||
|
||||
// sent to self only
|
||||
case object HeartbeatTick
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import akka.cluster.ClusterEvent.MemberRemoved
|
|||
import akka.cluster.ClusterEvent.MemberWeaklyUp
|
||||
import akka.remote.FailureDetectorRegistry
|
||||
import akka.remote.RemoteWatcher
|
||||
import akka.remote.RARP
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -51,6 +52,7 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
unreachableReaperInterval,
|
||||
heartbeatExpectedResponseAfter) {
|
||||
|
||||
private val arteryEnabled = RARP(context.system).provider.remoteSettings.Artery.Enabled
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.selfAddress
|
||||
|
||||
|
|
@ -89,8 +91,10 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
def memberRemoved(m: Member, previousStatus: MemberStatus): Unit =
|
||||
if (m.address != selfAddress) {
|
||||
clusterNodes -= m.address
|
||||
if (previousStatus == MemberStatus.Down) {
|
||||
quarantine(m.address, Some(m.uniqueAddress.uid), "Cluster member removed")
|
||||
// TODO We should probably always quarantine when member is removed,
|
||||
// but keeping old behavior for old remoting for now
|
||||
if (arteryEnabled || previousStatus == MemberStatus.Down) {
|
||||
quarantine(m.address, Some(m.uniqueAddress.uid), s"Cluster member removed, previous status [$previousStatus]")
|
||||
}
|
||||
publishAddressTerminated(m.address)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue