clu #20309 API for pluggable cluster downing

This commit is contained in:
Johan Andrén 2016-04-11 10:33:02 +02:00
parent 2418e610ab
commit 5671927cf1
9 changed files with 223 additions and 11 deletions

View file

@ -393,7 +393,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
import settings.tuningParameters._
val cluster = Cluster(context.system)
val removalMargin = cluster.settings.DownRemovalMargin
val removalMargin = cluster.downingProvider.downRemovalMargin
var state = State.empty.withRememberEntities(settings.rememberEntities)
var rebalanceInProgress = Set.empty[ShardId]

View file

@ -381,7 +381,7 @@ class ClusterSingletonManager(
s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")
val removalMargin =
if (settings.removalMargin <= Duration.Zero) cluster.settings.DownRemovalMargin
if (settings.removalMargin <= Duration.Zero) cluster.downingProvider.downRemovalMargin
else settings.removalMargin
val (maxHandOverRetries, maxTakeOverRetries) = {

View file

@ -27,6 +27,7 @@ akka {
# Using auto-down implies that two separate clusters will automatically be
# formed in case of network partition.
# Disable with "off" or specify a duration to enable auto-down.
# If a downing-provider-class is configured this setting is ignored.
auto-down-unreachable-after = off
# Time margin after which shards or singletons that belonged to a downed/removed
@ -40,6 +41,15 @@ akka {
# Disable with "off" or specify a duration to enable.
down-removal-margin = off
# Pluggable support for downing of nodes in the cluster.
# If this setting is left empty behaviour will depend on 'auto-down-unreachable' in the following ways:
# * if it is 'off' the `NoDowning` provider is used and no automatic downing will be performed
# * if it is set to a duration the `AutoDowning` provider is with the configured downing duration
#
# If specified the value must be the fully qualified class name of a subclass of
# `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem`
downing-provider-class = ""
# By default, the leader will not move 'Joining' members to 'Up' during a network
# split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp'
# so they become part of the cluster even during a network split. The leader will

View file

@ -3,14 +3,13 @@
*/
package akka.cluster
import akka.actor.Actor
import akka.ConfigurationException
import akka.actor.{ Actor, ActorSystem, Address, Cancellable, Props, Scheduler }
import scala.concurrent.duration.FiniteDuration
import akka.actor.Props
import akka.cluster.ClusterEvent._
import akka.actor.Cancellable
import scala.concurrent.duration.Duration
import akka.actor.Address
import akka.actor.Scheduler
/**
* INTERNAL API
@ -23,6 +22,24 @@ private[cluster] object AutoDown {
final case class UnreachableTimeout(node: UniqueAddress)
}
/**
* Used when no custom provider is configured and 'auto-down-unreachable-after' is enabled.
*/
final class AutoDowning(system: ActorSystem) extends DowningProvider {
private def clusterSettings = Cluster(system).settings
override def downRemovalMargin: FiniteDuration = clusterSettings.DownRemovalMargin
override def downingActorProps: Option[Props] =
clusterSettings.AutoDownUnreachableAfter match {
case d: FiniteDuration Some(AutoDown.props(d))
case _
// I don't think this can actually happen
throw new ConfigurationException("AutoDowning downing provider selected but 'akka.cluster.auto-down-unreachable-after' not set")
}
}
/**
* INTERNAL API
*

View file

@ -102,6 +102,10 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
new DefaultFailureDetectorRegistry(() createFailureDetector())
}
// needs to be lazy to allow downing provider impls to access Cluster (if not we get deadlock)
lazy val downingProvider: DowningProvider =
DowningProvider.load(settings.DowningProviderClassName, system)
// ========================================================
// ===================== WORK DAEMONS =====================
// ========================================================

View file

@ -57,6 +57,7 @@ private[cluster] object InternalClusterAction {
/**
* Command to join the cluster. Sent when a node wants to join another node (the receiver).
*
* @param node the node that wants to join the cluster
*/
@SerialVersionUID(1L)
@ -64,6 +65,7 @@ private[cluster] object InternalClusterAction {
/**
* Reply to Join
*
* @param from the sender node in the cluster, i.e. the node that received the Join command
*/
@SerialVersionUID(1L)
@ -293,10 +295,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[QuarantinedEvent])
AutoDownUnreachableAfter match {
case d: FiniteDuration
context.actorOf(AutoDown.props(d) withDispatcher (context.props.dispatcher), name = "autoDown")
case _ // auto-down is disabled
cluster.downingProvider.downingActorProps.foreach { props
val propsWithDispatcher =
if (props.dispatcher == Deploy.NoDispatcherGiven) props.withDispatcher(context.props.dispatcher)
else props
context.actorOf(propsWithDispatcher, name = "downingProvider")
}
if (seedNodes.isEmpty)

View file

@ -6,12 +6,14 @@ package akka.cluster
import scala.collection.immutable
import com.typesafe.config.Config
import com.typesafe.config.ConfigObject
import scala.concurrent.duration.Duration
import akka.actor.Address
import akka.actor.AddressFromURIString
import akka.dispatch.Dispatchers
import akka.util.Helpers.Requiring
import akka.util.Helpers.ConfigOps
import scala.concurrent.duration.FiniteDuration
import akka.japi.Util.immutableSeq
import java.util.Locale
@ -58,6 +60,7 @@ final class ClusterSettings(val config: Config, val systemName: String) {
}
}
// specific to the [[akka.cluster.DefaultDowningProvider]]
val AutoDownUnreachableAfter: Duration = {
val key = "auto-down-unreachable-after"
cc.getString(key).toLowerCase(Locale.ROOT) match {
@ -66,6 +69,12 @@ final class ClusterSettings(val config: Config, val systemName: String) {
}
}
/**
* @deprecated Specific to [[akka.cluster.AutoDown]] should not be used anywhere else, instead
* ``Cluster.downingProvider.downRemovalMargin`` should be used as it allows the downing provider to decide removal
* margins
*/
@deprecated("Use Cluster.downingProvider.downRemovalMargin", since = "2.4.5")
val DownRemovalMargin: FiniteDuration = {
val key = "down-removal-margin"
cc.getString(key).toLowerCase(Locale.ROOT) match {
@ -74,6 +83,13 @@ final class ClusterSettings(val config: Config, val systemName: String) {
}
}
val DowningProviderClassName: String = {
val name = cc.getString("downing-provider-class")
if (name.nonEmpty) name
else if (AutoDownUnreachableAfter.isFinite()) classOf[AutoDowning].getName
else classOf[NoDowning].getName
}
val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members")
val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet

View file

@ -0,0 +1,69 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import akka.ConfigurationException
import akka.actor.{ ActorSystem, ExtendedActorSystem, Props }
import com.typesafe.config.Config
import scala.concurrent.duration.{ Duration, FiniteDuration }
/**
* INTERNAL API
*/
private[cluster] object DowningProvider {
/**
* @param fqcn Fully qualified class name of the implementation to be loaded.
* @param system Actor system used to load the implemntation
* @return the provider or throws a [[akka.ConfigurationException]] if loading it fails
*/
def load(fqcn: String, system: ActorSystem): DowningProvider = {
val eas = system.asInstanceOf[ExtendedActorSystem]
eas.dynamicAccess.createInstanceFor[DowningProvider](
fqcn,
List((classOf[ActorSystem], system))).recover {
case e throw new ConfigurationException(
s"Could not create cluster downing provider [$fqcn]", e)
}.get
}
}
/**
* API for plugins that will handle downing of cluster nodes. Concrete plugins must subclass and
* have a public one argument constructor accepting an [[akka.actor.ActorSystem]].
*/
abstract class DowningProvider {
/**
* Time margin after which shards or singletons that belonged to a downed/removed
* partition are created in surviving partition. The purpose of this margin is that
* in case of a network partition the persistent actors in the non-surviving partitions
* must be stopped before corresponding persistent actors are started somewhere else.
* This is useful if you implement downing strategies that handle network partitions,
* e.g. by keeping the larger side of the partition and shutting down the smaller side.
*/
def downRemovalMargin: FiniteDuration
/**
* If a props is returned it is created as a child of the core cluster daemon on cluster startup.
* It should then handle downing using the regular [[akka.cluster.Cluster]] APIs.
* The actor will run on the same dispatcher as the cluster actor if dispatcher not configured.
*
* May throw an exception which will then immediately lead to Cluster stopping, as the downing
* provider is vital to a working cluster.
*/
def downingActorProps: Option[Props]
}
/**
* Default downing provider used when no provider is configured and 'auto-down-unreachable-after'
* is not enabled.
*/
final class NoDowning(system: ActorSystem) extends DowningProvider {
override def downRemovalMargin: FiniteDuration = Cluster(system).settings.DownRemovalMargin
override val downingActorProps: Option[Props] = None
}

View file

@ -0,0 +1,92 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import java.util.concurrent.atomic.AtomicBoolean
import akka.ConfigurationException
import akka.actor.{ ActorSystem, Props }
import akka.testkit.TestKit.{ awaitCond, shutdownActorSystem }
import akka.testkit.{ TestKit, TestProbe }
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Futures
import org.scalatest.{ Matchers, WordSpec }
import scala.concurrent.duration._
class FailingDowningProvider(system: ActorSystem) extends DowningProvider {
override val downRemovalMargin: FiniteDuration = 20.seconds
override def downingActorProps: Option[Props] = {
throw new ConfigurationException("this provider never works")
}
}
class DummyDowningProvider(system: ActorSystem) extends DowningProvider {
override val downRemovalMargin: FiniteDuration = 20.seconds
val actorPropsAccessed = new AtomicBoolean(false)
override val downingActorProps: Option[Props] = {
actorPropsAccessed.set(true)
None
}
}
class DowningProviderSpec extends WordSpec with Matchers {
val baseConf = ConfigFactory.parseString(
"""
akka {
loglevel = WARNING
actor.provider = "akka.cluster.ClusterActorRefProvider"
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
}
""").withFallback(ConfigFactory.load())
"The downing provider mechanism" should {
"default to akka.cluster.NoDowning" in {
val system = ActorSystem("default", baseConf)
Cluster(system).downingProvider shouldBe an[NoDowning]
shutdownActorSystem(system)
}
"use akka.cluster.AutoDowning if 'auto-down-unreachable-after' is configured" in {
val system = ActorSystem("auto-downing", ConfigFactory.parseString(
"""
akka.cluster.auto-down-unreachable-after = 18d
""").withFallback(baseConf))
Cluster(system).downingProvider shouldBe an[AutoDowning]
shutdownActorSystem(system)
}
"use the specified downing provider" in {
val system = ActorSystem("auto-downing", ConfigFactory.parseString(
"""
akka.cluster.downing-provider-class="akka.cluster.DummyDowningProvider"
""").withFallback(baseConf))
Cluster(system).downingProvider shouldBe a[DummyDowningProvider]
awaitCond(Cluster(system).downingProvider.asInstanceOf[DummyDowningProvider].actorPropsAccessed.get(), 3.seconds)
shutdownActorSystem(system)
}
"stop the cluster if the downing provider throws exception in props method" in {
val system = ActorSystem("auto-downing", ConfigFactory.parseString(
"""
akka.cluster.downing-provider-class="akka.cluster.FailingDowningProvider"
""").withFallback(baseConf))
val cluster = Cluster(system)
cluster.join(cluster.selfAddress)
awaitCond(cluster.isTerminated, 3.seconds)
shutdownActorSystem(system)
}
}
}