diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index f9c6e7e2ae..12bc13c9c5 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -393,7 +393,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti import settings.tuningParameters._ val cluster = Cluster(context.system) - val removalMargin = cluster.settings.DownRemovalMargin + val removalMargin = cluster.downingProvider.downRemovalMargin var state = State.empty.withRememberEntities(settings.rememberEntities) var rebalanceInProgress = Set.empty[ShardId] diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 1371f88b13..975c79083d 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -381,7 +381,7 @@ class ClusterSingletonManager( s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") val removalMargin = - if (settings.removalMargin <= Duration.Zero) cluster.settings.DownRemovalMargin + if (settings.removalMargin <= Duration.Zero) cluster.downingProvider.downRemovalMargin else settings.removalMargin val (maxHandOverRetries, maxTakeOverRetries) = { diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 0decb2e58e..3dc83b0db4 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -27,6 +27,7 @@ akka { # Using auto-down implies that two separate clusters will automatically be # formed in case of network partition. # Disable with "off" or specify a duration to enable auto-down. + # If a downing-provider-class is configured this setting is ignored. auto-down-unreachable-after = off # Time margin after which shards or singletons that belonged to a downed/removed @@ -40,6 +41,15 @@ akka { # Disable with "off" or specify a duration to enable. down-removal-margin = off + # Pluggable support for downing of nodes in the cluster. + # If this setting is left empty behaviour will depend on 'auto-down-unreachable' in the following ways: + # * if it is 'off' the `NoDowning` provider is used and no automatic downing will be performed + # * if it is set to a duration the `AutoDowning` provider is with the configured downing duration + # + # If specified the value must be the fully qualified class name of a subclass of + # `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem` + downing-provider-class = "" + # By default, the leader will not move 'Joining' members to 'Up' during a network # split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp' # so they become part of the cluster even during a network split. The leader will diff --git a/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala b/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala index e183f3e5ab..b06b4bfc14 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala @@ -3,14 +3,13 @@ */ package akka.cluster -import akka.actor.Actor +import akka.ConfigurationException +import akka.actor.{ Actor, ActorSystem, Address, Cancellable, Props, Scheduler } + import scala.concurrent.duration.FiniteDuration -import akka.actor.Props import akka.cluster.ClusterEvent._ -import akka.actor.Cancellable + import scala.concurrent.duration.Duration -import akka.actor.Address -import akka.actor.Scheduler /** * INTERNAL API @@ -23,6 +22,24 @@ private[cluster] object AutoDown { final case class UnreachableTimeout(node: UniqueAddress) } +/** + * Used when no custom provider is configured and 'auto-down-unreachable-after' is enabled. + */ +final class AutoDowning(system: ActorSystem) extends DowningProvider { + + private def clusterSettings = Cluster(system).settings + + override def downRemovalMargin: FiniteDuration = clusterSettings.DownRemovalMargin + + override def downingActorProps: Option[Props] = + clusterSettings.AutoDownUnreachableAfter match { + case d: FiniteDuration ⇒ Some(AutoDown.props(d)) + case _ ⇒ + // I don't think this can actually happen + throw new ConfigurationException("AutoDowning downing provider selected but 'akka.cluster.auto-down-unreachable-after' not set") + } +} + /** * INTERNAL API * diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 72401e9b5f..013ce73485 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -102,6 +102,10 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector()) } + // needs to be lazy to allow downing provider impls to access Cluster (if not we get deadlock) + lazy val downingProvider: DowningProvider = + DowningProvider.load(settings.DowningProviderClassName, system) + // ======================================================== // ===================== WORK DAEMONS ===================== // ======================================================== diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 3e2acdfae9..185004856a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -57,6 +57,7 @@ private[cluster] object InternalClusterAction { /** * Command to join the cluster. Sent when a node wants to join another node (the receiver). + * * @param node the node that wants to join the cluster */ @SerialVersionUID(1L) @@ -64,6 +65,7 @@ private[cluster] object InternalClusterAction { /** * Reply to Join + * * @param from the sender node in the cluster, i.e. the node that received the Join command */ @SerialVersionUID(1L) @@ -293,10 +295,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with override def preStart(): Unit = { context.system.eventStream.subscribe(self, classOf[QuarantinedEvent]) - AutoDownUnreachableAfter match { - case d: FiniteDuration ⇒ - context.actorOf(AutoDown.props(d) withDispatcher (context.props.dispatcher), name = "autoDown") - case _ ⇒ // auto-down is disabled + cluster.downingProvider.downingActorProps.foreach { props ⇒ + val propsWithDispatcher = + if (props.dispatcher == Deploy.NoDispatcherGiven) props.withDispatcher(context.props.dispatcher) + else props + + context.actorOf(propsWithDispatcher, name = "downingProvider") } if (seedNodes.isEmpty) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index f55d02ae94..711e7de8da 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -6,12 +6,14 @@ package akka.cluster import scala.collection.immutable import com.typesafe.config.Config import com.typesafe.config.ConfigObject + import scala.concurrent.duration.Duration import akka.actor.Address import akka.actor.AddressFromURIString import akka.dispatch.Dispatchers import akka.util.Helpers.Requiring import akka.util.Helpers.ConfigOps + import scala.concurrent.duration.FiniteDuration import akka.japi.Util.immutableSeq import java.util.Locale @@ -58,6 +60,7 @@ final class ClusterSettings(val config: Config, val systemName: String) { } } + // specific to the [[akka.cluster.DefaultDowningProvider]] val AutoDownUnreachableAfter: Duration = { val key = "auto-down-unreachable-after" cc.getString(key).toLowerCase(Locale.ROOT) match { @@ -66,6 +69,12 @@ final class ClusterSettings(val config: Config, val systemName: String) { } } + /** + * @deprecated Specific to [[akka.cluster.AutoDown]] should not be used anywhere else, instead + * ``Cluster.downingProvider.downRemovalMargin`` should be used as it allows the downing provider to decide removal + * margins + */ + @deprecated("Use Cluster.downingProvider.downRemovalMargin", since = "2.4.5") val DownRemovalMargin: FiniteDuration = { val key = "down-removal-margin" cc.getString(key).toLowerCase(Locale.ROOT) match { @@ -74,6 +83,13 @@ final class ClusterSettings(val config: Config, val systemName: String) { } } + val DowningProviderClassName: String = { + val name = cc.getString("downing-provider-class") + if (name.nonEmpty) name + else if (AutoDownUnreachableAfter.isFinite()) classOf[AutoDowning].getName + else classOf[NoDowning].getName + } + val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members") val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet diff --git a/akka-cluster/src/main/scala/akka/cluster/DowningProvider.scala b/akka-cluster/src/main/scala/akka/cluster/DowningProvider.scala new file mode 100644 index 0000000000..b307902617 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/DowningProvider.scala @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.cluster + +import akka.ConfigurationException +import akka.actor.{ ActorSystem, ExtendedActorSystem, Props } +import com.typesafe.config.Config + +import scala.concurrent.duration.{ Duration, FiniteDuration } + +/** + * INTERNAL API + */ +private[cluster] object DowningProvider { + + /** + * @param fqcn Fully qualified class name of the implementation to be loaded. + * @param system Actor system used to load the implemntation + * @return the provider or throws a [[akka.ConfigurationException]] if loading it fails + */ + def load(fqcn: String, system: ActorSystem): DowningProvider = { + val eas = system.asInstanceOf[ExtendedActorSystem] + eas.dynamicAccess.createInstanceFor[DowningProvider]( + fqcn, + List((classOf[ActorSystem], system))).recover { + case e ⇒ throw new ConfigurationException( + s"Could not create cluster downing provider [$fqcn]", e) + }.get + } + +} + +/** + * API for plugins that will handle downing of cluster nodes. Concrete plugins must subclass and + * have a public one argument constructor accepting an [[akka.actor.ActorSystem]]. + */ +abstract class DowningProvider { + + /** + * Time margin after which shards or singletons that belonged to a downed/removed + * partition are created in surviving partition. The purpose of this margin is that + * in case of a network partition the persistent actors in the non-surviving partitions + * must be stopped before corresponding persistent actors are started somewhere else. + * This is useful if you implement downing strategies that handle network partitions, + * e.g. by keeping the larger side of the partition and shutting down the smaller side. + */ + def downRemovalMargin: FiniteDuration + + /** + * If a props is returned it is created as a child of the core cluster daemon on cluster startup. + * It should then handle downing using the regular [[akka.cluster.Cluster]] APIs. + * The actor will run on the same dispatcher as the cluster actor if dispatcher not configured. + * + * May throw an exception which will then immediately lead to Cluster stopping, as the downing + * provider is vital to a working cluster. + */ + def downingActorProps: Option[Props] + +} + +/** + * Default downing provider used when no provider is configured and 'auto-down-unreachable-after' + * is not enabled. + */ +final class NoDowning(system: ActorSystem) extends DowningProvider { + override def downRemovalMargin: FiniteDuration = Cluster(system).settings.DownRemovalMargin + override val downingActorProps: Option[Props] = None +} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/DowningProviderSpec.scala b/akka-cluster/src/test/scala/akka/cluster/DowningProviderSpec.scala new file mode 100644 index 0000000000..87025c791b --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/DowningProviderSpec.scala @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.cluster + +import java.util.concurrent.atomic.AtomicBoolean + +import akka.ConfigurationException +import akka.actor.{ ActorSystem, Props } +import akka.testkit.TestKit.{ awaitCond, shutdownActorSystem } +import akka.testkit.{ TestKit, TestProbe } +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.Futures +import org.scalatest.{ Matchers, WordSpec } + +import scala.concurrent.duration._ + +class FailingDowningProvider(system: ActorSystem) extends DowningProvider { + override val downRemovalMargin: FiniteDuration = 20.seconds + override def downingActorProps: Option[Props] = { + throw new ConfigurationException("this provider never works") + } +} + +class DummyDowningProvider(system: ActorSystem) extends DowningProvider { + override val downRemovalMargin: FiniteDuration = 20.seconds + + val actorPropsAccessed = new AtomicBoolean(false) + override val downingActorProps: Option[Props] = { + actorPropsAccessed.set(true) + None + } +} + +class DowningProviderSpec extends WordSpec with Matchers { + + val baseConf = ConfigFactory.parseString( + """ + akka { + loglevel = WARNING + actor.provider = "akka.cluster.ClusterActorRefProvider" + remote { + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + } + """).withFallback(ConfigFactory.load()) + + "The downing provider mechanism" should { + + "default to akka.cluster.NoDowning" in { + val system = ActorSystem("default", baseConf) + Cluster(system).downingProvider shouldBe an[NoDowning] + shutdownActorSystem(system) + } + + "use akka.cluster.AutoDowning if 'auto-down-unreachable-after' is configured" in { + val system = ActorSystem("auto-downing", ConfigFactory.parseString( + """ + akka.cluster.auto-down-unreachable-after = 18d + """).withFallback(baseConf)) + Cluster(system).downingProvider shouldBe an[AutoDowning] + shutdownActorSystem(system) + } + + "use the specified downing provider" in { + val system = ActorSystem("auto-downing", ConfigFactory.parseString( + """ + akka.cluster.downing-provider-class="akka.cluster.DummyDowningProvider" + """).withFallback(baseConf)) + + Cluster(system).downingProvider shouldBe a[DummyDowningProvider] + awaitCond(Cluster(system).downingProvider.asInstanceOf[DummyDowningProvider].actorPropsAccessed.get(), 3.seconds) + shutdownActorSystem(system) + } + + "stop the cluster if the downing provider throws exception in props method" in { + val system = ActorSystem("auto-downing", ConfigFactory.parseString( + """ + akka.cluster.downing-provider-class="akka.cluster.FailingDowningProvider" + """).withFallback(baseConf)) + val cluster = Cluster(system) + cluster.join(cluster.selfAddress) + + awaitCond(cluster.isTerminated, 3.seconds) + shutdownActorSystem(system) + } + + } +}