=clt #16948 Use min retries for singleton leaving scenario"
* In 2.4 we derive the number of hand-over/take-over retries from the removal margin, but we decided to set that to 0 by default, since it is intended for network partition scenarios. maxTakeOverRetries became 1. So there must be also be a min number of retries property. * The test failed for the leaving scenario because the singleton instance was stopped hard without sending the terminationMessage when the maxTakeOverRetries was exceeded.
This commit is contained in:
parent
c57b4e24c8
commit
6d036ca00c
3 changed files with 20 additions and 2 deletions
|
|
@ -141,6 +141,11 @@ akka.cluster.singleton {
|
||||||
# the previous oldest confirms that the hand over has started or the previous
|
# the previous oldest confirms that the hand over has started or the previous
|
||||||
# oldest member is removed from the cluster (+ akka.cluster.down-removal-margin).
|
# oldest member is removed from the cluster (+ akka.cluster.down-removal-margin).
|
||||||
hand-over-retry-interval = 1s
|
hand-over-retry-interval = 1s
|
||||||
|
|
||||||
|
# The number of retries are derived from hand-over-retry-interval and
|
||||||
|
# akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin),
|
||||||
|
# but it will never be less than this property.
|
||||||
|
min-number-of-hand-over-retries = 10
|
||||||
}
|
}
|
||||||
# //#singleton-config
|
# //#singleton-config
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -387,7 +387,13 @@ class ClusterSingletonManager(
|
||||||
|
|
||||||
val (maxHandOverRetries, maxTakeOverRetries) = {
|
val (maxHandOverRetries, maxTakeOverRetries) = {
|
||||||
val n = (removalMargin.toMillis / handOverRetryInterval.toMillis).toInt
|
val n = (removalMargin.toMillis / handOverRetryInterval.toMillis).toInt
|
||||||
(n + 3, math.max(1, n - 3))
|
val minRetries = context.system.settings.config.getInt(
|
||||||
|
"akka.cluster.singleton.min-number-of-hand-over-retries")
|
||||||
|
require(minRetries >= 1, "min-number-of-hand-over-retries must be >= 1")
|
||||||
|
val handOverRetries = math.max(minRetries, n + 3)
|
||||||
|
val takeOverRetries = math.max(1, handOverRetries - 3)
|
||||||
|
|
||||||
|
(handOverRetries, takeOverRetries)
|
||||||
}
|
}
|
||||||
|
|
||||||
// started when when self member is Up
|
// started when when self member is Up
|
||||||
|
|
|
||||||
|
|
@ -115,15 +115,21 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
|
||||||
/**
|
/**
|
||||||
* The Singleton actor
|
* The Singleton actor
|
||||||
*/
|
*/
|
||||||
class Consumer(queue: ActorRef, delegateTo: ActorRef) extends Actor {
|
class Consumer(queue: ActorRef, delegateTo: ActorRef) extends Actor with ActorLogging {
|
||||||
|
|
||||||
import Consumer._
|
import Consumer._
|
||||||
import PointToPointChannel._
|
import PointToPointChannel._
|
||||||
|
|
||||||
var current = 0
|
var current = 0
|
||||||
|
var stoppedBeforeUnregistration = true
|
||||||
|
|
||||||
override def preStart(): Unit = queue ! RegisterConsumer
|
override def preStart(): Unit = queue ! RegisterConsumer
|
||||||
|
|
||||||
|
override def postStop(): Unit = {
|
||||||
|
if (stoppedBeforeUnregistration)
|
||||||
|
log.warning("Stopped before unregistration")
|
||||||
|
}
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case n: Int if n <= current ⇒
|
case n: Int if n <= current ⇒
|
||||||
context.stop(self)
|
context.stop(self)
|
||||||
|
|
@ -138,6 +144,7 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
|
||||||
case End ⇒
|
case End ⇒
|
||||||
queue ! UnregisterConsumer
|
queue ! UnregisterConsumer
|
||||||
case UnregistrationOk ⇒
|
case UnregistrationOk ⇒
|
||||||
|
stoppedBeforeUnregistration = false
|
||||||
context stop self
|
context stop self
|
||||||
case Ping ⇒
|
case Ping ⇒
|
||||||
sender() ! Pong
|
sender() ! Pong
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue