diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 978438b4be..a24dd0faa9 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -54,6 +54,11 @@ akka { # `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem` downing-provider-class = "" + # Artery only setting + # When a node has been gracefully removed, let this time pass (to allow for example + # cluster singleton handover to complete) and then quarantine the removed node. + quarantine-removed-node-after=30s + # By default, the leader will not move 'Joining' members to 'Up' during a network # split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp' # so they become part of the cluster even during a network split. The leader will diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 534d23e273..af6d3cba2f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -52,9 +52,12 @@ private[cluster] class ClusterRemoteWatcher( unreachableReaperInterval, heartbeatExpectedResponseAfter) { + private val arteryEnabled = RARP(context.system).provider.remoteSettings.Artery.Enabled val cluster = Cluster(context.system) import cluster.selfAddress + private final case class DelayedQuarantine(m: Member, previousStatus: MemberStatus) extends NoSerializationVerificationNeeded + var clusterNodes: Set[Address] = Set.empty override def preStart(): Unit = { @@ -74,10 +77,11 @@ private[cluster] class ClusterRemoteWatcher( clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address } clusterNodes foreach takeOverResponsibility unreachable = unreachable diff clusterNodes - case MemberUp(m) ⇒ memberUp(m) - case MemberWeaklyUp(m) ⇒ memberUp(m) - case MemberRemoved(m, previousStatus) ⇒ memberRemoved(m, previousStatus) - case _: MemberEvent ⇒ // not interesting + case MemberUp(m) ⇒ memberUp(m) + case MemberWeaklyUp(m) ⇒ memberUp(m) + case MemberRemoved(m, previousStatus) ⇒ memberRemoved(m, previousStatus) + case _: MemberEvent ⇒ // not interesting + case DelayedQuarantine(m, previousStatus) ⇒ delayedQuarantine(m, previousStatus) } def memberUp(m: Member): Unit = @@ -90,14 +94,22 @@ private[cluster] class ClusterRemoteWatcher( def memberRemoved(m: Member, previousStatus: MemberStatus): Unit = if (m.address != selfAddress) { clusterNodes -= m.address - // The reason we don't quarantine gracefully removed members (leaving) is that - // Cluster Singleton need to exchange TakeOver/HandOver messages. + if (previousStatus == MemberStatus.Down) { - quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]") + quarantine(m.address, Some(m.uniqueAddress.uid), s"Cluster member removed, previous status [$previousStatus]") + } else if (arteryEnabled) { + // don't quarantine gracefully removed members (leaving) directly, + // give Cluster Singleton some time to exchange TakeOver/HandOver messages. + import context.dispatcher + context.system.scheduler.scheduleOnce(cluster.settings.QuarantineRemovedNodeAfter, self, DelayedQuarantine(m, previousStatus)) } + publishAddressTerminated(m.address) } + def delayedQuarantine(m: Member, previousStatus: MemberStatus): Unit = + quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]") + override def watchNode(watchee: InternalActorRef) = if (!clusterNodes(watchee.path.address)) super.watchNode(watchee) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 682a7cb849..462a8ca01a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -88,6 +88,9 @@ final class ClusterSettings(val config: Config, val systemName: String) { else classOf[NoDowning].getName } + val QuarantineRemovedNodeAfter: FiniteDuration = + cc.getMillisDuration("quarantine-removed-node-after") requiring (_ > Duration.Zero, "quarantine-removed-node-after must be > 0") + val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members") val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet