!clu #3683 Change cluster heartbeat to req/rsp protocol

* The previous one-way hearbeat was elegant, but comlicated to
  understand and without giving much extra value compared to this approach.
* The previous one-way heartbeat have some kind of bug when joining
  several (10-20) nodes at approximately the same time (but not exactly
  the same time) with a false failure detection triggered by the extra heartbeat,
  which would not heal.
* This ping-pong approach will increase network traffic slightly, but heartbeat
  messages are small and each node is limited to monitor (default) 5 peers.
This commit is contained in:
Patrik Nordwall 2013-11-07 13:52:08 +01:00
parent 05f402c236
commit eaad7ecf7e
15 changed files with 350 additions and 439 deletions

View file

@ -65,18 +65,8 @@ message Welcome {
*/
/**
* EndHeartbeat
* Sends an Address
*/
/**
* EndHeartbeatAck
* Sends an Address
*/
/**
* HeartbeatRequest
* Sends an Address
* HeartbeatRsp
* Sends an UniqueAddress
*/
/****************************************

View file

@ -138,21 +138,12 @@ akka {
# Number of member nodes that each member will send heartbeat messages to,
# i.e. each node will be monitored by this number of other nodes.
monitored-by-nr-of-members = 5
# After the heartbeat request has been sent the first failure detection
# will start after this period, even though no heartbeat mesage has
# been received.
expected-response-after = 5 s
# When no expected heartbeat message has been received an explicit
# heartbeat request is sent to the node that should emit heartbeats.
heartbeat-request {
# Grace period until an explicit heartbeat request is sent
grace-period = 10 s
# After the heartbeat request has been sent the first failure detection
# will start after this period, even though no heartbeat mesage has
# been received.
expected-response-after = 5 s
# Cleanup of obsolete heartbeat requests
time-to-live = 60 s
}
}
metrics {

View file

@ -113,8 +113,6 @@ private[cluster] object InternalClusterAction {
case object GossipSpeedupTick extends Tick
case object HeartbeatTick extends Tick
case object ReapUnreachableTick extends Tick
case object MetricsTick extends Tick

View file

@ -4,54 +4,25 @@
package akka.cluster
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.{ ActorLogging, ActorRef, ActorSelection, Address, Actor, RootActorPath, Props }
import akka.cluster.ClusterEvent._
import akka.routing.MurmurHash
/**
* INTERNAL API
*/
private[akka] object ClusterHeartbeatReceiver {
/**
* Sent at regular intervals for failure detection.
*/
case class Heartbeat(from: Address) extends ClusterMessage
/**
* Tell failure detector at receiving side that it should
* remove the monitoring, because heartbeats will end from
* this node.
*/
case class EndHeartbeat(from: Address) extends ClusterMessage
/**
* Acknowledgment that `EndHeartbeat` was received and heartbeating
* can stop.
*/
case class EndHeartbeatAck(from: Address) extends ClusterMessage
}
import akka.remote.FailureDetectorRegistry
/**
* INTERNAL API.
*
* Receives Heartbeat messages and updates failure detector.
* Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized
* to Cluster message after message, but concurrent with other types of messages.
* Receives Heartbeat messages and replies.
*/
private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
import ClusterHeartbeatReceiver._
import ClusterHeartbeatSender._
val failureDetector = Cluster(context.system).failureDetector
val selfEndHeartbeatAck = EndHeartbeatAck(Cluster(context.system).selfAddress)
val selfHeartbeatRsp = HeartbeatRsp(Cluster(context.system).selfUniqueAddress)
def receive = {
case Heartbeat(from) failureDetector heartbeat from
case EndHeartbeat(from)
failureDetector remove from
sender ! selfEndHeartbeatAck
case Heartbeat(from) sender ! selfHeartbeatRsp
}
}
@ -61,50 +32,46 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo
*/
private[cluster] object ClusterHeartbeatSender {
/**
* Request heartbeats from another node. Sent from the node that is
* expecting heartbeats from a specific sender, but has not received any.
* Sent at regular intervals for failure detection.
*/
case class HeartbeatRequest(from: Address) extends ClusterMessage
case class Heartbeat(from: Address) extends ClusterMessage
/**
* Delayed sending of a HeartbeatRequest. The actual request is
* only sent if no expected heartbeat message has been received.
* Local only, no need to serialize.
* Sent as reply to [[Heartbeat]] messages.
*/
case class SendHeartbeatRequest(to: Address)
case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage
// sent to self only
case object HeartbeatTick
case class ExpectedFirstHeartbeat(from: UniqueAddress)
/**
* Trigger a fake heartbeat message to trigger start of failure detection
* of a node that this node is expecting heartbeats from. HeartbeatRequest
* has been sent to the node so it should have started sending heartbeat
* messages.
* Local only, no need to serialize.
*/
case class ExpectedFirstHeartbeat(from: Address)
}
/*
* INTERNAL API
*
* This actor is responsible for sending the heartbeat messages to
* a few other nodes that will monitor this node.
* a few other nodes, which will reply and then this actor updates the
* failure detector.
*/
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
import ClusterHeartbeatSender._
import ClusterHeartbeatReceiver._
import InternalClusterAction.HeartbeatTick
val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler }
import cluster.{ selfAddress, selfUniqueAddress, scheduler }
import cluster.settings._
import cluster.InfoLogger._
import context.dispatcher
val selfHeartbeat = Heartbeat(selfAddress)
val selfEndHeartbeat = EndHeartbeat(selfAddress)
val selfHeartbeatRequest = HeartbeatRequest(selfAddress)
// the failureDetector is only updated by this actor, but read from other places
val failureDetector = Cluster(context.system).failureDetector
var state = ClusterHeartbeatSenderState.empty(selfAddress, MonitoredByNrOfMembers)
val selfHeartbeat = Heartbeat(selfAddress)
var state = ClusterHeartbeatSenderState(
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), MonitoredByNrOfMembers),
unreachable = Set.empty[UniqueAddress],
failureDetector)
// start periodic heartbeat to other nodes in cluster
val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay max HeartbeatInterval,
@ -115,6 +82,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
}
override def postStop(): Unit = {
state.activeReceivers.foreach(a failureDetector.remove(a.address))
heartbeatTask.cancel()
cluster.unsubscribe(self)
}
@ -125,215 +93,146 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def heartbeatReceiver(address: Address): ActorSelection =
context.actorSelection(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
/**
* Looks up and returns the remote cluster heartbeat sender for the specific address.
*/
def heartbeatSender(address: Address): ActorSelection =
context.actorSelection(self.path.toStringWithAddress(address))
def receive = initializing
def receive = {
def initializing: Actor.Receive = {
case s: CurrentClusterState
init(s)
context.become(active)
case HeartbeatTick
}
def active: Actor.Receive = {
case HeartbeatTick heartbeat()
case HeartbeatRsp(from) heartbeatRsp(from)
case MemberUp(m) addMember(m)
case MemberRemoved(m, _) removeMember(m)
case s: CurrentClusterState reset(s)
case MemberExited(m) memberExited(m)
case _: MemberEvent // not interested in other types of MemberEvent
case HeartbeatRequest(from) addHeartbeatRequest(from)
case SendHeartbeatRequest(to) sendHeartbeatRequest(to)
case ExpectedFirstHeartbeat(from) triggerFirstHeartbeat(from)
case EndHeartbeatAck(from) ackEndHeartbeat(from)
}
def reset(snapshot: CurrentClusterState): Unit =
state = state.reset(snapshot.members.collect {
case m if m.status == MemberStatus.Up m.address
}(collection.breakOut))
def addMember(m: Member): Unit = if (m.address != selfAddress) state = state addMember m.address
def removeMember(m: Member): Unit = {
if (m.uniqueAddress == cluster.selfUniqueAddress)
// This cluster node will be shutdown, but stop this actor immediately
// to prevent it from sending out anything more and avoid sending EndHeartbeat.
context stop self
else
state = state removeMember m.address
def init(snapshot: CurrentClusterState): Unit = {
val nodes: Set[UniqueAddress] = snapshot.members.collect {
case m if m.status == MemberStatus.Up m.uniqueAddress
}(collection.breakOut)
state = state.init(nodes)
}
def memberExited(m: Member): Unit =
def addMember(m: Member): Unit =
if (m.uniqueAddress != selfUniqueAddress)
state = state.addMember(m.uniqueAddress)
def removeMember(m: Member): Unit =
if (m.uniqueAddress == cluster.selfUniqueAddress) {
// This cluster node will be shutdown, but stop this actor immediately
// to prevent it from sending out anything more and avoid sending EndHeartbeat.
// to avoid further updates
context stop self
}
def addHeartbeatRequest(address: Address): Unit =
if (address != selfAddress) state = state.addHeartbeatRequest(address, Deadline.now + HeartbeatRequestTimeToLive)
def sendHeartbeatRequest(address: Address): Unit =
if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) {
heartbeatSender(address) ! selfHeartbeatRequest
// schedule the expected heartbeat for later, which will give the
// sender a chance to start heartbeating, and also trigger some resends of
// the heartbeat request
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(address))
}
def triggerFirstHeartbeat(address: Address): Unit =
if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) {
logInfo("Trigger extra expected heartbeat from [{}]", address)
cluster.failureDetector.heartbeat(address)
} else {
state = state.removeMember(m.uniqueAddress)
}
def heartbeat(): Unit = {
state = state.removeOverdueHeartbeatRequest()
state.active foreach { to
log.debug("Cluster Node [{}] - Heartbeat to [{}]", cluster.selfAddress, to)
heartbeatReceiver(to) ! selfHeartbeat
}
// When sending heartbeats to a node is stopped a `EndHeartbeat` messages are
// sent to notify it that no more heartbeats will be sent. This will continue
// until `EndHeartbeatAck` is received.
for (to state.ending) {
log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", cluster.selfAddress, to)
heartbeatReceiver(to) ! selfEndHeartbeat
}
// request heartbeats from expected sender node if no heartbeat messages has been received
state.ring.mySenders foreach { address
if (!cluster.failureDetector.isMonitoring(address))
scheduler.scheduleOnce(HeartbeatRequestDelay, self, SendHeartbeatRequest(address))
state.activeReceivers foreach { to
if (cluster.failureDetector.isMonitoring(to.address))
log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, to.address)
else {
log.debug("Cluster Node [{}] - First Heartbeat to [{}]", selfAddress, to.address)
// schedule the expected first heartbeat for later, which will give the
// other side a chance to reply, and also trigger some resends if needed
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(to))
}
heartbeatReceiver(to.address) ! selfHeartbeat
}
}
def ackEndHeartbeat(from: Address): Unit = {
state = state.removeEnding(from)
def heartbeatRsp(from: UniqueAddress): Unit = {
log.debug("Cluster Node [{}] - Heartbeat response from [{}]", selfAddress, from.address)
state = state.heartbeatRsp(from)
}
def triggerFirstHeartbeat(from: UniqueAddress): Unit =
if (state.activeReceivers(from) && !failureDetector.isMonitoring(from.address)) {
log.debug("Cluster Node [{}] - Trigger extra expected heartbeat from [{}]", selfAddress, from.address)
failureDetector.heartbeat(from.address)
}
}
/**
* INTERNAL API
* State of [[ClusterHeartbeatSender]]. Encapsulated to facilitate unit testing.
* It is immutable, but it updates the failureDetector.
*/
private[cluster] object ClusterHeartbeatSenderState {
/**
* Initial, empty state
*/
def empty(selfAddress: Address, monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(HeartbeatNodeRing(selfAddress, Set(selfAddress), monitoredByNrOfMembers))
/**
* Create a new state based on previous state, and
* keep track of which nodes to stop sending heartbeats to.
*/
private def apply(
old: ClusterHeartbeatSenderState,
ring: HeartbeatNodeRing): ClusterHeartbeatSenderState = {
val curr = ring.myReceivers
// start ending process for nodes not selected any more
// abort ending process for nodes that have been selected again
val end = old.ending ++ (old.current -- curr) -- curr
old.copy(ring = ring, current = curr, ending = end, heartbeatRequest = old.heartbeatRequest -- curr)
}
}
/**
* INTERNAL API
*
* State used by [akka.cluster.ClusterHeartbeatSender].
* The initial state is created with `empty` in the of
* the companion object, thereafter the state is modified
* with the methods, such as `addMember`. It is immutable,
* i.e. the methods return new instances.
*/
private[cluster] case class ClusterHeartbeatSenderState private (
private[cluster] case class ClusterHeartbeatSenderState(
ring: HeartbeatNodeRing,
current: Set[Address] = Set.empty,
ending: Set[Address] = Set.empty,
heartbeatRequest: Map[Address, Deadline] = Map.empty) {
unreachable: Set[UniqueAddress],
failureDetector: FailureDetectorRegistry[Address]) {
if (Cluster.isAssertInvariantsEnabled) assertInvariants()
val activeReceivers: Set[UniqueAddress] = ring.myReceivers ++ unreachable
private def assertInvariants(): Unit = {
val currentAndEnding = current.intersect(ending)
require(currentAndEnding.isEmpty,
s"Same nodes in current and ending not allowed, got [${currentAndEnding}]")
def selfAddress = ring.selfAddress
val currentAndHeartbeatRequest = current.intersect(heartbeatRequest.keySet)
require(currentAndHeartbeatRequest.isEmpty,
s"Same nodes in current and heartbeatRequest not allowed, got [${currentAndHeartbeatRequest}]")
def init(nodes: Set[UniqueAddress]): ClusterHeartbeatSenderState =
copy(ring = ring.copy(nodes = nodes + selfAddress))
val currentNotInAll = current -- ring.nodes
require(current.isEmpty || currentNotInAll.isEmpty,
s"Nodes in current but not in ring nodes not allowed, got [${currentNotInAll}]")
def addMember(node: UniqueAddress): ClusterHeartbeatSenderState =
membershipChange(ring :+ node)
require(!current.contains(ring.selfAddress),
s"Self in current not allowed, got [${ring.selfAddress}]")
require(!heartbeatRequest.contains(ring.selfAddress),
s"Self in heartbeatRequest not allowed, got [${ring.selfAddress}]")
}
def removeMember(node: UniqueAddress): ClusterHeartbeatSenderState = {
val newState = membershipChange(ring :- node)
val active: Set[Address] = current ++ heartbeatRequest.keySet
def reset(nodes: immutable.HashSet[Address]): ClusterHeartbeatSenderState = {
ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeHeartbeatRequest _ }, ring.copy(nodes = nodes + ring.selfAddress))
}
def addMember(a: Address): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(removeHeartbeatRequest(a), ring :+ a)
def removeMember(a: Address): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(removeHeartbeatRequest(a), ring :- a)
private def removeHeartbeatRequest(address: Address): ClusterHeartbeatSenderState = {
if (heartbeatRequest contains address)
copy(heartbeatRequest = heartbeatRequest - address, ending = ending + address)
else this
}
def addHeartbeatRequest(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = {
if (current.contains(address)) this
else copy(heartbeatRequest = heartbeatRequest + (address -> deadline), ending = ending - address)
}
/**
* Cleanup overdue heartbeatRequest
*/
def removeOverdueHeartbeatRequest(): ClusterHeartbeatSenderState = {
val overdue = heartbeatRequest collect { case (address, deadline) if deadline.isOverdue address }
if (overdue.isEmpty) this
failureDetector remove node.address
if (newState.unreachable(node))
newState.copy(unreachable = newState.unreachable - node)
else
copy(ending = ending ++ overdue, heartbeatRequest = heartbeatRequest -- overdue)
newState
}
def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a)
private def membershipChange(newRing: HeartbeatNodeRing): ClusterHeartbeatSenderState = {
val oldReceivers = ring.myReceivers
val removedReceivers = oldReceivers -- newRing.myReceivers
var newUnreachable = unreachable
removedReceivers foreach { a
if (failureDetector.isAvailable(a.address))
failureDetector remove a.address
else
newUnreachable += a
}
copy(newRing, newUnreachable)
}
def heartbeatRsp(from: UniqueAddress): ClusterHeartbeatSenderState =
if (activeReceivers(from)) {
failureDetector heartbeat from.address
if (unreachable(from)) {
// back from unreachable, ok to stop heartbeating to it
if (!ring.myReceivers(from))
failureDetector remove from.address
copy(unreachable = unreachable - from)
} else this
} else this
}
/**
* INTERNAL API
*
* Data structure for picking heartbeat receivers and keep track of what nodes
* that are expected to send heartbeat messages to a node. The node ring is
* Data structure for picking heartbeat receivers. The node ring is
* shuffled by deterministic hashing to avoid picking physically co-located
* neighbors.
*
* It is immutable, i.e. the methods return new instances.
*/
private[cluster] case class HeartbeatNodeRing(selfAddress: Address, nodes: Set[Address], monitoredByNrOfMembers: Int) {
private[cluster] case class HeartbeatNodeRing(selfAddress: UniqueAddress, nodes: Set[UniqueAddress], monitoredByNrOfMembers: Int) {
require(nodes contains selfAddress, s"nodes [${nodes.mkString(", ")}] must contain selfAddress [${selfAddress}]")
private val nodeRing: immutable.SortedSet[Address] = {
implicit val ringOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b)
private val nodeRing: immutable.SortedSet[UniqueAddress] = {
implicit val ringOrdering: Ordering[UniqueAddress] = Ordering.fromLessThan[UniqueAddress] { (a, b)
val ha = a.##
val hb = b.##
ha < hb || (ha == hb && Member.addressOrdering.compare(a, b) < 0)
ha < hb || (ha == hb && Member.addressOrdering.compare(a.address, b.address) < 0)
}
immutable.SortedSet() ++ nodes
@ -342,18 +241,14 @@ private[cluster] case class HeartbeatNodeRing(selfAddress: Address, nodes: Set[A
/**
* Receivers for `selfAddress`. Cached for subsequent access.
*/
lazy val myReceivers: immutable.Set[Address] = receivers(selfAddress)
/**
* Senders for `selfAddress`. Cached for subsequent access.
*/
lazy val mySenders: immutable.Set[Address] = senders(selfAddress)
lazy val myReceivers: immutable.Set[UniqueAddress] = receivers(selfAddress)
private val useAllAsReceivers = monitoredByNrOfMembers >= (nodeRing.size - 1)
/**
* The receivers to use from a specified sender.
*/
def receivers(sender: Address): immutable.Set[Address] =
def receivers(sender: UniqueAddress): immutable.Set[UniqueAddress] =
if (useAllAsReceivers)
nodeRing - sender
else {
@ -363,20 +258,14 @@ private[cluster] case class HeartbeatNodeRing(selfAddress: Address, nodes: Set[A
else slice
}
/**
* The expected senders for a specific receiver.
*/
def senders(receiver: Address): Set[Address] =
nodes filter { sender receivers(sender) contains receiver }
/**
* Add a node to the ring.
*/
def :+(node: Address): HeartbeatNodeRing = if (nodes contains node) this else copy(nodes = nodes + node)
def :+(node: UniqueAddress): HeartbeatNodeRing = if (nodes contains node) this else copy(nodes = nodes + node)
/**
* Remove a node from the ring.
*/
def :-(node: Address): HeartbeatNodeRing = if (nodes contains node) copy(nodes = nodes - node) else this
def :-(node: UniqueAddress): HeartbeatNodeRing = if (nodes contains node) copy(nodes = nodes - node) else this
}

View file

@ -25,15 +25,9 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val HeartbeatInterval: FiniteDuration = {
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0")
val HeartbeatRequestDelay: FiniteDuration = {
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.grace-period"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.grace-period must be > 0")
val HeartbeatExpectedResponseAfter: FiniteDuration = {
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.expected-response-after"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.expected-response-after > 0")
val HeartbeatRequestTimeToLive: FiniteDuration = {
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-request.time-to-live"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-request.time-to-live > 0")
Duration(FailureDetectorConfig.getMilliseconds("expected-response-after"), MILLISECONDS)
} requiring (_ > Duration.Zero, "failure-detector.expected-response-after > 0")
val MonitoredByNrOfMembers: Int = {
FailureDetectorConfig.getInt("monitored-by-nr-of-members")
} requiring (_ > 0, "failure-detector.monitored-by-nr-of-members must be > 0")

View file

@ -47,10 +47,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
InternalClusterAction.InitJoin.getClass -> (_ InternalClusterAction.InitJoin),
classOf[InternalClusterAction.InitJoinAck] -> (bytes InternalClusterAction.InitJoinAck(addressFromBinary(bytes))),
classOf[InternalClusterAction.InitJoinNack] -> (bytes InternalClusterAction.InitJoinNack(addressFromBinary(bytes))),
classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatReceiver.EndHeartbeatAck] -> (bytes ClusterHeartbeatReceiver.EndHeartbeatAck(addressFromBinary(bytes))),
classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))),
classOf[ClusterHeartbeatSender.Heartbeat] -> (bytes ClusterHeartbeatSender.Heartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatSender.HeartbeatRsp] -> (bytes ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes))),
classOf[GossipStatus] -> gossipStatusFromBinary,
classOf[GossipEnvelope] -> gossipEnvelopeFromBinary,
classOf[MetricsGossipEnvelope] -> metricsGossipEnvelopeFromBinary)
@ -60,20 +58,18 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
def identifier = 5
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case ClusterHeartbeatReceiver.Heartbeat(from) addressToProtoByteArray(from)
case m: GossipEnvelope gossipEnvelopeToProto(m).toByteArray
case m: GossipStatus gossipStatusToProto(m).toByteArray
case m: MetricsGossipEnvelope compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles) joinToProto(node, roles).toByteArray
case InternalClusterAction.Welcome(from, gossip) compress(welcomeToProto(from, gossip))
case ClusterUserAction.Leave(address) addressToProtoByteArray(address)
case ClusterUserAction.Down(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoin cm.Empty.getDefaultInstance.toByteArray
case InternalClusterAction.InitJoinAck(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoinNack(address) addressToProtoByteArray(address)
case ClusterHeartbeatReceiver.EndHeartbeat(from) addressToProtoByteArray(from)
case ClusterHeartbeatReceiver.EndHeartbeatAck(from) addressToProtoByteArray(from)
case ClusterHeartbeatSender.HeartbeatRequest(from) addressToProtoByteArray(from)
case ClusterHeartbeatSender.Heartbeat(from) addressToProtoByteArray(from)
case ClusterHeartbeatSender.HeartbeatRsp(from) uniqueAddressToProtoByteArray(from)
case m: GossipEnvelope gossipEnvelopeToProto(m).toByteArray
case m: GossipStatus gossipStatusToProto(m).toByteArray
case m: MetricsGossipEnvelope compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles) joinToProto(node, roles).toByteArray
case InternalClusterAction.Welcome(from, gossip) compress(welcomeToProto(from, gossip))
case ClusterUserAction.Leave(address) addressToProtoByteArray(address)
case ClusterUserAction.Down(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoin cm.Empty.getDefaultInstance.toByteArray
case InternalClusterAction.InitJoinAck(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoinNack(address) addressToProtoByteArray(address)
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
}
@ -127,6 +123,9 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
private def uniqueAddressToProto(uniqueAddress: UniqueAddress): cm.UniqueAddress.Builder =
cm.UniqueAddress.newBuilder().setAddress(addressToProto(uniqueAddress.address)).setUid(uniqueAddress.uid)
private def uniqueAddressToProtoByteArray(uniqueAddress: UniqueAddress): Array[Byte] =
uniqueAddressToProto(uniqueAddress).build.toByteArray
// we don't care about races here since it's just a cache
@volatile
private var protocolCache: String = null

View file

@ -21,7 +21,6 @@ object InitialHeartbeatMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster.failure-detector.heartbeat-request.grace-period = 3 s
akka.cluster.failure-detector.threshold = 4""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
@ -43,26 +42,35 @@ abstract class InitialHeartbeatSpec
"A member" must {
"detect failure even though no heartbeats have been received" taggedAs LongRunningTest in {
val firstAddress = address(first)
val secondAddress = address(second)
awaitClusterUp(first)
runOn(first) {
within(10 seconds) {
awaitAssert {
awaitAssert({
cluster.sendCurrentClusterState(testActor)
expectMsgType[CurrentClusterState].members.map(_.address) must contain(secondAddress)
}
}, interval = 50.millis)
}
}
runOn(second) {
cluster.join(first)
within(10 seconds) {
awaitAssert({
cluster.sendCurrentClusterState(testActor)
expectMsgType[CurrentClusterState].members.map(_.address) must contain(firstAddress)
}, interval = 50.millis)
}
}
enterBarrier("second-joined")
runOn(controller) {
// it is likely that first has not started sending heartbeats to second yet
// Direction must be Receive because the gossip from first to second must pass through
testConductor.blackhole(first, second, Direction.Receive).await
// It is likely that second has not started heartbeating to first yet,
// and when it does the messages doesn't go through and the first extra heartbeat is triggered.
// If the first heartbeat arrives, it will detect the failure anyway but not really exercise the
// part that we are trying to test here.
testConductor.blackhole(first, second, Direction.Both).await
}
runOn(second) {

View file

@ -148,7 +148,7 @@ abstract class MBeanSpec
|""".stripMargin
// awaitAssert to make sure that all nodes detects unreachable
within(5.seconds) {
within(15.seconds) {
awaitAssert(mbeanServer.getAttribute(mbeanName, "ClusterStatus") must be(expectedJson))
}
}

View file

@ -105,8 +105,8 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
}
muteDeadLetters(
classOf[ClusterHeartbeatReceiver.Heartbeat],
classOf[ClusterHeartbeatReceiver.EndHeartbeat],
classOf[ClusterHeartbeatSender.Heartbeat],
classOf[ClusterHeartbeatSender.HeartbeatRsp],
classOf[GossipEnvelope],
classOf[GossipStatus],
classOf[MetricsGossipEnvelope],

View file

@ -32,9 +32,7 @@ class ClusterConfigSpec extends AkkaSpec {
GossipTimeToLive must be(2 seconds)
HeartbeatInterval must be(1 second)
MonitoredByNrOfMembers must be(5)
HeartbeatRequestDelay must be(10 seconds)
HeartbeatExpectedResponseAfter must be(5 seconds)
HeartbeatRequestTimeToLive must be(1 minute)
LeaderActionsInterval must be(1 second)
UnreachableNodesReaperInterval must be(1 second)
PublishStatsInterval must be(Duration.Undefined)

View file

@ -7,100 +7,212 @@ package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import akka.routing.ConsistentHash
import scala.concurrent.duration._
import scala.collection.immutable
import scala.collection.immutable.HashSet
import akka.remote.FailureDetector
import akka.remote.DefaultFailureDetectorRegistry
import scala.concurrent.forkjoin.ThreadLocalRandom
object ClusterHeartbeatSenderStateSpec {
class FailureDetectorStub extends FailureDetector {
trait Status
object Up extends Status
object Down extends Status
object Unknown extends Status
private var status: Status = Unknown
def markNodeAsUnavailable(): Unit = status = Down
def markNodeAsAvailable(): Unit = status = Up
override def isAvailable: Boolean = status match {
case Unknown | Up true
case Down false
}
override def isMonitoring: Boolean = status != Unknown
override def heartbeat(): Unit = status = Up
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
import ClusterHeartbeatSenderStateSpec._
val selfAddress = Address("akka.tcp", "sys", "myself", 2552)
val aa = Address("akka.tcp", "sys", "aa", 2552)
val bb = Address("akka.tcp", "sys", "bb", 2552)
val cc = Address("akka.tcp", "sys", "cc", 2552)
val dd = Address("akka.tcp", "sys", "dd", 2552)
val ee = Address("akka.tcp", "sys", "ee", 2552)
val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1)
val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2)
val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3)
val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4)
val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5)
val emptyState = ClusterHeartbeatSenderState.empty(selfAddress, 3)
def emptyState: ClusterHeartbeatSenderState = emptyState(aa)
def emptyState(selfUniqueAddress: UniqueAddress) = ClusterHeartbeatSenderState(
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), monitoredByNrOfMembers = 3),
unreachable = Set.empty[UniqueAddress],
failureDetector = new DefaultFailureDetectorRegistry[Address](() new FailureDetectorStub))
def fd(state: ClusterHeartbeatSenderState, node: UniqueAddress): FailureDetectorStub =
state.failureDetector.asInstanceOf[DefaultFailureDetectorRegistry[Address]].failureDetector(node.address).
get.asInstanceOf[FailureDetectorStub]
"A ClusterHeartbeatSenderState" must {
"return empty active set when no nodes" in {
emptyState.active.isEmpty must be(true)
emptyState.activeReceivers.isEmpty must be(true)
}
"include heartbeatRequest in active set" in {
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds)
s.heartbeatRequest.keySet must be(Set(aa))
s.active must be(Set(aa))
"init with empty" in {
emptyState.init(Set.empty).activeReceivers must be(Set.empty)
}
"remove heartbeatRequest from active set after removeOverdueHeartbeatRequest" in {
val s = emptyState.addHeartbeatRequest(aa, Deadline.now - 30.seconds).removeOverdueHeartbeatRequest()
s.heartbeatRequest must be(Map.empty)
s.active must be(Set.empty)
s.ending must be(Set(aa))
"init with self" in {
emptyState.init(Set(aa, bb, cc)).activeReceivers must be(Set(bb, cc))
}
"remove heartbeatRequest after reset" in {
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(HashSet(aa, bb))
s.heartbeatRequest must be(Map.empty)
"init without self" in {
emptyState.init(Set(bb, cc)).activeReceivers must be(Set(bb, cc))
}
"remove heartbeatRequest after addMember" in {
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).addMember(aa)
s.heartbeatRequest must be(Map.empty)
"use added members" in {
emptyState.addMember(bb).addMember(cc).activeReceivers must be(Set(bb, cc))
}
"remove heartbeatRequest after removeMember" in {
val s = emptyState.addHeartbeatRequest(aa, Deadline.now + 30.seconds).reset(HashSet(aa, bb)).removeMember(aa)
s.heartbeatRequest must be(Map.empty)
s.ending must be(Set(aa))
"not use removed members" in {
emptyState.addMember(bb).addMember(cc).removeMember(bb).activeReceivers must be(Set(cc))
}
"remove from ending after addHeartbeatRequest" in {
val s = emptyState.reset(HashSet(aa, bb)).removeMember(aa)
s.ending must be(Set(aa))
val s2 = s.addHeartbeatRequest(aa, Deadline.now + 30.seconds)
s2.heartbeatRequest.keySet must be(Set(aa))
s2.ending must be(Set.empty)
"use specified number of members" in {
// they are sorted by the hash (uid) of the UniqueAddress
emptyState.addMember(cc).addMember(dd).addMember(bb).addMember(ee).activeReceivers must be(Set(bb, cc, dd))
}
"include nodes from reset in active set" in {
val nodes = HashSet(aa, bb, cc)
val s = emptyState.reset(nodes)
s.current must be(nodes)
s.ending must be(Set.empty)
s.active must be(nodes)
"update failure detector in active set" in {
val s1 = emptyState.addMember(bb).addMember(cc).addMember(dd)
val s2 = s1.heartbeatRsp(bb).heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee)
s2.failureDetector.isMonitoring(bb.address) must be(true)
s2.failureDetector.isMonitoring(cc.address) must be(true)
s2.failureDetector.isMonitoring(dd.address) must be(true)
s2.failureDetector.isMonitoring(ee.address) must be(false)
}
"limit current nodes to monitoredByNrOfMembers when adding members" in {
val nodes = Set(aa, bb, cc, dd)
val s = nodes.foldLeft(emptyState) { _ addMember _ }
s.current.size must be(3)
s.addMember(ee).current.size must be(3)
"continue to use unreachable" in {
val s1 = emptyState.addMember(cc).addMember(dd).addMember(ee)
val s2 = s1.heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee)
fd(s2, ee).markNodeAsUnavailable()
s2.failureDetector.isAvailable(ee.address) must be(false)
s2.addMember(bb).activeReceivers must be(Set(bb, cc, dd, ee))
}
"move member to ending set when removing member" in {
val nodes = HashSet(aa, bb, cc, dd, ee)
val s = emptyState.reset(nodes)
s.ending must be(Set.empty)
val included = s.current.head
val s2 = s.removeMember(included)
s2.ending must be(Set(included))
s2.current must not contain (included)
val s3 = s2.addMember(included)
s3.current must contain(included)
s3.ending must not contain (included)
"remove unreachable when coming back" in {
val s1 = emptyState.addMember(cc).addMember(dd).addMember(ee)
val s2 = s1.heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee)
fd(s2, dd).markNodeAsUnavailable()
fd(s2, ee).markNodeAsUnavailable()
val s3 = s2.addMember(bb)
s3.activeReceivers must be(Set(bb, cc, dd, ee))
val s4 = s3.heartbeatRsp(bb).heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee)
s4.activeReceivers must be(Set(bb, cc, dd))
s4.failureDetector.isMonitoring(ee.address) must be(false)
}
"remove ending correctly" in {
val s = emptyState.reset(HashSet(aa)).removeMember(aa)
s.ending must be(Set(aa))
val s2 = s.removeEnding(aa)
s2.ending must be(Set.empty)
"remove unreachable when member removed" in {
val s1 = emptyState.addMember(cc).addMember(dd).addMember(ee)
val s2 = s1.heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee)
fd(s2, cc).markNodeAsUnavailable()
fd(s2, ee).markNodeAsUnavailable()
val s3 = s2.addMember(bb).heartbeatRsp(bb)
s3.activeReceivers must be(Set(bb, cc, dd, ee))
val s4 = s3.removeMember(cc).removeMember(ee)
s4.activeReceivers must be(Set(bb, dd))
s4.failureDetector.isMonitoring(cc.address) must be(false)
s4.failureDetector.isMonitoring(ee.address) must be(false)
}
"behave correctly for random operations" in {
val rnd = ThreadLocalRandom.current
val nodes = (1 to rnd.nextInt(10, 200)).map(n UniqueAddress(Address("akka.tcp", "sys", "n" + n, 2552), n)).toVector
def rndNode() = nodes(rnd.nextInt(0, nodes.size))
val selfUniqueAddress = rndNode()
var state = emptyState(selfUniqueAddress)
val Add = 0
val Remove = 1
val Unreachable = 2
val HeartbeatRsp = 3
for (i 1 to 100000) {
val operation = rnd.nextInt(Add, HeartbeatRsp + 1)
val node = rndNode()
try {
operation match {
case Add
if (node != selfUniqueAddress && !state.ring.nodes.contains(node)) {
val oldUnreachable = state.unreachable
state = state.addMember(node)
// keep unreachable
(oldUnreachable -- state.activeReceivers) must be(Set.empty)
state.failureDetector.isMonitoring(node.address) must be(false)
state.failureDetector.isAvailable(node.address) must be(true)
}
case Remove
if (node != selfUniqueAddress && state.ring.nodes.contains(node)) {
val oldUnreachable = state.unreachable
state = state.removeMember(node)
// keep unreachable, unless it was the removed
if (oldUnreachable(node))
(oldUnreachable -- state.activeReceivers) must be(Set(node))
else
(oldUnreachable -- state.activeReceivers) must be(Set.empty)
state.failureDetector.isMonitoring(node.address) must be(false)
state.failureDetector.isAvailable(node.address) must be(true)
state.activeReceivers must not contain (node)
}
case Unreachable
if (node != selfUniqueAddress && state.activeReceivers(node)) {
state.failureDetector.heartbeat(node.address) // make sure the fd is created
fd(state, node).markNodeAsUnavailable()
state.failureDetector.isMonitoring(node.address) must be(true)
state.failureDetector.isAvailable(node.address) must be(false)
}
case HeartbeatRsp
if (node != selfUniqueAddress && state.ring.nodes.contains(node)) {
val oldUnreachable = state.unreachable
val oldReceivers = state.activeReceivers
val oldRingReceivers = state.ring.myReceivers
state = state.heartbeatRsp(node)
if (oldUnreachable(node))
state.unreachable must not contain (node)
if (oldUnreachable(node) && !oldRingReceivers(node))
state.failureDetector.isMonitoring(node.address) must be(false)
if (oldRingReceivers(node))
state.failureDetector.isMonitoring(node.address) must be(true)
state.ring.myReceivers must be(oldRingReceivers)
state.failureDetector.isAvailable(node.address) must be(true)
}
}
} catch {
case e: Throwable
println(s"Failure context: i=$i, node=$node, op=$operation, unreachable=${state.unreachable}, " +
s"ringReceivers=${state.ring.myReceivers}, ringNodes=${state.ring.nodes}")
throw e
}
}
}
}
}

View file

@ -16,58 +16,21 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with ShouldMatchers {
val iterations = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.iterations").getOrElse("10000").toInt
def createHeartbeatNodeRingOfSize(size: Int): HeartbeatNodeRing = {
val nodes = (1 to size).map(n Address("akka.tcp", "sys", "node-" + n, 2552)).toSet
val selfAddress = Address("akka.tcp", "sys", "node-" + (size / 2), 2552)
HeartbeatNodeRing(selfAddress, nodes, 5)
}
def createClusterHeartbeatSenderStateOfSize(size: Int): ClusterHeartbeatSenderState = {
val nodes = (1 to size).map(n Address("akka.tcp", "sys", "node-" + n, 2552)).to[HashSet]
val selfAddress = Address("akka.tcp", "sys", "node-" + (size / 2), 2552)
ClusterHeartbeatSenderState.empty(selfAddress, 5).reset(nodes)
val nodes = (1 to size).map(n UniqueAddress(Address("akka.tcp", "sys", "node-" + n, 2552), n))
val selfAddress = nodes(size / 2)
HeartbeatNodeRing(selfAddress, nodes.toSet, 5)
}
val heartbeatNodeRing = createHeartbeatNodeRingOfSize(nodesSize)
val heartbeatSenderState = createClusterHeartbeatSenderStateOfSize(nodesSize)
def checkThunkForRing(ring: HeartbeatNodeRing, thunk: HeartbeatNodeRing Unit, times: Int): Unit =
for (i 1 to times) thunk(ring)
def checkThunkForState(state: ClusterHeartbeatSenderState, thunk: ClusterHeartbeatSenderState Unit, times: Int): Unit =
for (i 1 to times) thunk(state)
def myReceivers(ring: HeartbeatNodeRing): Unit = {
val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, ring.monitoredByNrOfMembers)
r.myReceivers.isEmpty should be(false)
}
def mySenders(ring: HeartbeatNodeRing): Unit = {
val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, ring.monitoredByNrOfMembers)
r.mySenders.isEmpty should be(false)
}
def reset(state: ClusterHeartbeatSenderState): Unit = {
val s = ClusterHeartbeatSenderState.empty(state.ring.selfAddress, state.ring.monitoredByNrOfMembers).reset(
state.ring.nodes.asInstanceOf[HashSet[Address]])
s.active.isEmpty should be(false)
}
def addMember(state: ClusterHeartbeatSenderState): Unit = {
val s = state.addMember(Address("akka.tcp", "sys", "new-node", 2552))
s.active.isEmpty should be(false)
}
def removeMember(state: ClusterHeartbeatSenderState): Unit = {
val s = state.removeMember(Address("akka.tcp", "sys", "node-" + (nodesSize / 3), 2552))
s.active.isEmpty should be(false)
}
def addHeartbeatRequest(state: ClusterHeartbeatSenderState): Unit = {
val a = Address("akka.tcp", "sys", "node-" + (nodesSize / 3), 2552)
val s = state.addHeartbeatRequest(a, Deadline.now)
s.active should contain(a)
}
s"HeartbeatNodeRing of size $nodesSize" must {
s"do a warm up run, $iterations times" in {
@ -78,32 +41,6 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with ShouldMatchers {
checkThunkForRing(heartbeatNodeRing, myReceivers, iterations)
}
s"produce mySenders, $iterations times" in {
checkThunkForRing(heartbeatNodeRing, mySenders, iterations)
}
}
s"ClusterHeartbeatSenderState of size $nodesSize" must {
s"do a warm up run, $iterations times" in {
checkThunkForState(heartbeatSenderState, reset, iterations)
}
s"reset, $iterations times" in {
checkThunkForState(heartbeatSenderState, reset, iterations)
}
s"addMember node, $iterations times" in {
checkThunkForState(heartbeatSenderState, addMember, iterations)
}
s"removeMember node, $iterations times" in {
checkThunkForState(heartbeatSenderState, removeMember, iterations)
}
s"addHeartbeatRequest node, $iterations times" in {
checkThunkForState(heartbeatSenderState, addHeartbeatRequest, iterations)
}
}
}

View file

@ -14,11 +14,11 @@ import scala.collection.immutable
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class HeartbeatNodeRingSpec extends WordSpec with MustMatchers {
val aa = Address("akka.tcp", "sys", "aa", 2552)
val bb = Address("akka.tcp", "sys", "bb", 2552)
val cc = Address("akka.tcp", "sys", "cc", 2552)
val dd = Address("akka.tcp", "sys", "dd", 2552)
val ee = Address("akka.tcp", "sys", "ee", 2552)
val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1)
val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2)
val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3)
val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4)
val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5)
val nodes = Set(aa, bb, cc, dd, ee)
@ -42,19 +42,9 @@ class HeartbeatNodeRingSpec extends WordSpec with MustMatchers {
HeartbeatNodeRing(cc, nodes, 6).myReceivers must be(expected)
}
"have matching senders and receivers" in {
val ring = HeartbeatNodeRing(cc, nodes, 3)
ring.mySenders must be(ring.senders(cc))
for (sender nodes; receiver ring.receivers(sender)) {
ring.senders(receiver) must contain(sender)
}
}
"pick none when alone" in {
val ring = HeartbeatNodeRing(cc, Set(cc), 3)
ring.myReceivers must be(Set())
ring.mySenders must be(Set())
}
}

View file

@ -52,10 +52,8 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
checkSerialization(InternalClusterAction.InitJoin)
checkSerialization(InternalClusterAction.InitJoinAck(address))
checkSerialization(InternalClusterAction.InitJoinNack(address))
checkSerialization(ClusterHeartbeatReceiver.Heartbeat(address))
checkSerialization(ClusterHeartbeatReceiver.EndHeartbeat(address))
checkSerialization(ClusterHeartbeatReceiver.EndHeartbeatAck(address))
checkSerialization(ClusterHeartbeatSender.HeartbeatRequest(address))
checkSerialization(ClusterHeartbeatSender.Heartbeat(address))
checkSerialization(ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress))
val node1 = VectorClock.Node("node1")
val node2 = VectorClock.Node("node2")

View file

@ -92,3 +92,10 @@ without much trouble.
Read more about the new routers in the :ref:`documentation for Scala <routing-scala>` and
:ref:`documentation for Java <routing-java>`.
Changed cluster expected-response-after configuration
=====================================================
Configuration property ``akka.cluster.failure-detector.heartbeat-request.expected-response-after``
has been renamed to ``akka.cluster.failure-detector.expected-response-after``.