Detect failure when no heartbeats sent, see #2907
* Subscribe to InstantMemberEvent and start heartbeating when InstantMemberUp. Same for metrics. * HeartbeatNodeRing data structure for bidirectional mapping of heartbeat sender and receiver. Not using ConsistentHash anymore. Node addresses are hashed to ensure that neighbors are spread out. * HeartbeatRequest when receiver detects that it has not received expected heartbeats. * New test InitialHeartbeatSpec that simulates the problem * Add/remove some related conf properties * Add some more logging to be able to diagnose eventual problems * Explicit config of nr-of-end-heartbeats
This commit is contained in:
parent
c5685a0855
commit
8b4e903e7d
25 changed files with 466 additions and 146 deletions
|
|
@ -181,7 +181,6 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
|
|||
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging {
|
||||
import ClusterLeaderAction._
|
||||
import InternalClusterAction._
|
||||
import ClusterHeartbeatSender.JoinInProgress
|
||||
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.{ selfAddress, scheduler, failureDetector }
|
||||
|
|
@ -281,7 +280,13 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
* A 'Join(thisNodeAddress)' command is sent to the node to join.
|
||||
*/
|
||||
def join(address: Address): Unit = {
|
||||
if (!latestGossip.members.exists(_.address == address)) {
|
||||
if (address.protocol != selfAddress.protocol)
|
||||
log.info("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
|
||||
selfAddress.protocol, address.protocol)
|
||||
else if (address.system != selfAddress.system)
|
||||
log.info("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
|
||||
selfAddress.system, address.system)
|
||||
else if (!latestGossip.members.exists(_.address == address)) {
|
||||
// wipe our state since a node that joins a cluster must be empty
|
||||
latestGossip = Gossip.empty
|
||||
// wipe the failure detector since we are starting fresh and shouldn't care about the past
|
||||
|
|
@ -290,7 +295,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
publisher ! PublishStart
|
||||
|
||||
publish(latestGossip)
|
||||
heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
|
||||
|
||||
context.become(initialized)
|
||||
if (address == selfAddress)
|
||||
|
|
@ -331,7 +335,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
|
||||
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
|
||||
if (node != selfAddress) {
|
||||
failureDetector heartbeat node
|
||||
gossipTo(node)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue