diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 606ad1c54b..287197abb8 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -15,12 +15,25 @@ akka { # Leave as empty if the node is supposed to be joined manually. seed-nodes = [] - # how long to wait for one of the seed nodes to reply to initial join request + # How long to wait for one of the seed nodes to reply to initial join request. + # When this is the first seed node and there is no positive reply from the other + # seed nodes within this timeout it will join itself to bootstrap the cluster. + # When this is not the first seed node the join attempts will be performed with + # this interval. seed-node-timeout = 5s # If a join request fails it will be retried after this period. # Disable join retry by specifying "off". retry-unsuccessful-join-after = 10s + + # The joining of given seed nodes will by default be retried indefinitely until + # a successful join. That process can be aborted if unsuccessful by defining this + # timeout. When aborted it will run CoordinatedShutdown, which by default will + # terminate the ActorSystem. CoordinatedShutdown can also be configured to exit + # the JVM. It is useful to define this timeout if the seed-nodes are assembled + # dynamically and a restart with new seed-nodes should be tried after unsuccessful + # attempts. + shutdown-after-unsuccessful-join-seed-nodes = off # Should the 'leader' in the cluster be allowed to automatically mark # unreachable nodes as DOWN after a configured time of unreachability? diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index b24b1249c2..7881eee728 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -293,6 +293,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with var seedNodes = SeedNodes var seedNodeProcess: Option[ActorRef] = None var seedNodeProcessCounter = 0 // for unique names + var joinSeedNodesDeadline: Option[Deadline] = None var leaderActionCounter = 0 var exitingTasksInProgress = false @@ -376,9 +377,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with case InitJoin ⇒ logInfo("Received InitJoin message from [{}], but this node is not initialized yet", sender()) sender() ! InitJoinNack(selfAddress) - case ClusterUserAction.JoinTo(address) ⇒ join(address) - case JoinSeedNodes(newSeedNodes) ⇒ joinSeedNodes(newSeedNodes) - case msg: SubscriptionMessage ⇒ publisher forward msg + case ClusterUserAction.JoinTo(address) ⇒ + join(address) + case JoinSeedNodes(newSeedNodes) ⇒ + resetJoinSeedNodesDeadline() + joinSeedNodes(newSeedNodes) + case msg: SubscriptionMessage ⇒ + publisher forward msg + case _: Tick ⇒ + if (joinSeedNodesDeadline.exists(_.isOverdue)) + joinSeedNodesWasUnsuccessful() }: Actor.Receive).orElse(receiveExitingCompleted) def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = ({ @@ -390,11 +398,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with becomeUninitialized() join(address) case JoinSeedNodes(newSeedNodes) ⇒ + resetJoinSeedNodesDeadline() becomeUninitialized() joinSeedNodes(newSeedNodes) case msg: SubscriptionMessage ⇒ publisher forward msg case _: Tick ⇒ - if (deadline.exists(_.isOverdue)) { + if (joinSeedNodesDeadline.exists(_.isOverdue)) + joinSeedNodesWasUnsuccessful() + else if (deadline.exists(_.isOverdue)) { // join attempt failed, retry becomeUninitialized() if (seedNodes.nonEmpty) joinSeedNodes(seedNodes) @@ -402,6 +413,22 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } }: Actor.Receive).orElse(receiveExitingCompleted) + private def resetJoinSeedNodesDeadline(): Unit = { + joinSeedNodesDeadline = ShutdownAfterUnsuccessfulJoinSeedNodes match { + case d: FiniteDuration ⇒ Some(Deadline.now + d) + case _ ⇒ None // off + } + } + + private def joinSeedNodesWasUnsuccessful(): Unit = { + log.warning( + "Joining of seed-nodes [{}] was unsuccessful after configured " + + "shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.", + seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes) + joinSeedNodesDeadline = None + CoordinatedShutdown(context.system).run() + } + def becomeUninitialized(): Unit = { // make sure that join process is stopped stopSeedNodeProcess() @@ -415,6 +442,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with withDispatcher(UseDispatcher), name = "heartbeatSender") // make sure that join process is stopped stopSeedNodeProcess() + joinSeedNodesDeadline = None context.become(initialized) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index d3c89e17d7..3d6f50d91f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -43,6 +43,13 @@ final class ClusterSettings(val config: Config, val systemName: String) { case _ ⇒ cc.getMillisDuration(key) requiring (_ > Duration.Zero, key + " > 0s, or off") } } + val ShutdownAfterUnsuccessfulJoinSeedNodes: Duration = { + val key = "shutdown-after-unsuccessful-join-seed-nodes" + toRootLowerCase(cc.getString(key)) match { + case "off" ⇒ Duration.Undefined + case _ ⇒ cc.getMillisDuration(key) requiring (_ > Duration.Zero, key + " > 0s, or off") + } + } val PeriodicTasksInitialDelay: FiniteDuration = cc.getMillisDuration("periodic-tasks-initial-delay") val GossipInterval: FiniteDuration = cc.getMillisDuration("gossip-interval") val GossipTimeToLive: FiniteDuration = { diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 481556bd77..8530c0388d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -28,6 +28,7 @@ class ClusterConfigSpec extends AkkaSpec { SeedNodes should ===(Vector.empty[Address]) SeedNodeTimeout should ===(5 seconds) RetryUnsuccessfulJoinAfter should ===(10 seconds) + ShutdownAfterUnsuccessfulJoinSeedNodes should ===(Duration.Undefined) PeriodicTasksInitialDelay should ===(1 seconds) GossipInterval should ===(1 second) GossipTimeToLive should ===(2 seconds) diff --git a/akka-cluster/src/test/scala/akka/cluster/ShutdownAfterJoinSeedNodesSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ShutdownAfterJoinSeedNodesSpec.scala new file mode 100644 index 0000000000..6270f01a19 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ShutdownAfterJoinSeedNodesSpec.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.actor.Address +import akka.testkit._ + +object ShutdownAfterJoinSeedNodesSpec { + + val config = """ + akka.actor.provider = "cluster" + akka.coordinated-shutdown.terminate-actor-system = on + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.cluster { + seed-node-timeout = 2s + retry-unsuccessful-join-after = 2s + shutdown-after-unsuccessful-join-seed-nodes = 5s + } + """ +} + +class ShutdownAfterJoinSeedNodesSpec extends AkkaSpec(ShutdownAfterJoinSeedNodesSpec.config) { + + val seed1 = ActorSystem(system.name, system.settings.config) + val seed2 = ActorSystem(system.name, system.settings.config) + val oridinary1 = ActorSystem(system.name, system.settings.config) + + override protected def afterTermination(): Unit = { + shutdown(seed1) + shutdown(seed2) + shutdown(oridinary1) + } + + "Joining seed nodes" must { + "be aborted after shutdown-after-unsuccessful-join-seed-nodes" taggedAs LongRunningTest in { + + val seedNodes: immutable.IndexedSeq[Address] = Vector(seed1, seed2).map(s ⇒ Cluster(s).selfAddress) + shutdown(seed1) // crash so that others will not be able to join + + Cluster(seed2).joinSeedNodes(seedNodes) + Cluster(oridinary1).joinSeedNodes(seedNodes) + + Await.result(seed2.whenTerminated, Cluster(seed2).settings.ShutdownAfterUnsuccessfulJoinSeedNodes + 10.second) + Await.result(oridinary1.whenTerminated, Cluster(seed2).settings.ShutdownAfterUnsuccessfulJoinSeedNodes + 10.second) + } + + } +} diff --git a/akka-docs/src/main/paradox/scala/additional/deploy.md b/akka-docs/src/main/paradox/scala/additional/deploy.md index b94b1235d6..dc456a2dda 100644 --- a/akka-docs/src/main/paradox/scala/additional/deploy.md +++ b/akka-docs/src/main/paradox/scala/additional/deploy.md @@ -8,7 +8,7 @@ be put into `WEB-INF/lib` ## Lightbend Enterprise Suite -An Akka application can also be packaged and deployed using [Lightbend Enterprise Suite](https://conductr.lightbend.com/docs/2.2.x/DevQuickStart). +An Akka application can also be packaged and deployed using [Lightbend Enterprise Suite](https://conductr.lightbend.com/docs/2.1.x/DevQuickStart). Lightbend Enterprise Suite is a solution for managing Lightbend Reactive Platform applications across a cluster of machines. It is reactive from the ground up thus enabling operations to provide the resiliency required to unleash the full benefits diff --git a/akka-docs/src/main/paradox/scala/cluster-usage.md b/akka-docs/src/main/paradox/scala/cluster-usage.md index d9467b3540..4071b78594 100644 --- a/akka-docs/src/main/paradox/scala/cluster-usage.md +++ b/akka-docs/src/main/paradox/scala/cluster-usage.md @@ -63,9 +63,6 @@ akka { } } -# Disable legacy metrics in akka-cluster. -akka.cluster.metrics.enabled=off - # Enable metrics extension in akka-cluster-metrics. akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] @@ -111,8 +108,11 @@ The source code of this sample can be found in the ## Joining to Seed Nodes You may decide if joining to the cluster should be done manually or automatically -to configured initial contact points, so-called seed nodes. When a new node is started -it sends a message to all seed nodes and then sends join command to the one that +to configured initial contact points, so-called seed nodes. After the joining process +the seed nodes are not special and they participate in the cluster in exactly the same +way as other nodes. + +When a new node is started it sends a message to all seed nodes and then sends join command to the one that answers first. If no one of the seed nodes replied (might not be started yet) it retries this procedure until successful or shutdown. @@ -131,6 +131,12 @@ This can also be defined as Java system properties when starting the JVM using t -Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host2:2552 ``` +Such configuration is typically created dynamically by external tools, see for example: + +* [Deploying clustered Akka applications on Kubernetes](http://developer.lightbend.com/guides/k8-akka-cluster/) +* [ConductR](https://conductr.lightbend.com/docs/2.1.x/AkkaAndPlay#Akka-Clustering) +* [ConstructR](https://github.com/hseeberger/constructr) + The seed nodes can be started in any order and it is not necessary to have all seed nodes running, but the node configured as the first element in the `seed-nodes` configuration list must be started when initially starting a cluster, otherwise the @@ -143,23 +149,16 @@ can join. Once more than two seed nodes have been started it is no problem to shut down the first seed node. If the first seed node is restarted, it will first try to join the other -seed nodes in the existing cluster. - -If you don't configure seed nodes you need to join the cluster programmatically or manually. - -Manual joining can be performed by using [JMX](#cluster-jmx) or [HTTP](#cluster-http). -Joining programmatically can be performed with @scala[`Cluster(system).join`]@java[`Cluster.get(system).join`]. Unsuccessful join attempts are -automatically retried after the time period defined in configuration property `retry-unsuccessful-join-after`. -Retries can be disabled by setting the property to `off`. - -You can join to any node in the cluster. It does not have to be configured as a seed node. -Note that you can only join to an existing cluster member, which means that for bootstrapping some -node must join itself,and then the following nodes could join them to make up a cluster. +seed nodes in the existing cluster. Note that if you stop all seed nodes at the same time +and restart them with the same `seed-nodes` configuration they will join themselves and +form a new cluster instead of joining remaining nodes of the existing cluster. That is +likely not desired and should be avoided by listing several nodes as seed nodes for redundancy +and don't stop all of them at the same time. You may also use @scala[`Cluster(system).joinSeedNodes`]@java[`Cluster.get(system).joinSeedNodes`] to join programmatically, which is attractive when dynamically discovering other nodes at startup by using some external tool or API. When using `joinSeedNodes` you should not include the node itself except for the node that is -supposed to be the first seed node, and that should be placed first in parameter to +supposed to be the first seed node, and that should be placed first in the parameter to `joinSeedNodes`. Unsuccessful attempts to contact seed nodes are automatically retried after the time period defined in @@ -169,6 +168,25 @@ tries to contact all seed nodes and then joins the node that answers first. The of seed nodes will join itself if it cannot contact any of the other seed nodes within the configured `seed-node-timeout`. +The joining of given seed nodes will by default be retried indefinitely until +a successful join. That process can be aborted if unsuccessful by configuring a +timeout. When aborted it will run @ref:[Coordinated Shutdown](actors.md#coordinated-shutdown), +which by default will terminated the ActorSystem. CoordinatedShutdown can also be configured to exit +the JVM. It is useful to define this timeout if the `seed-nodes` are assembled +dynamically and a restart with new seed-nodes should be tried after unsuccessful +attempts. + +``` +akka.cluster.shutdown-after-unsuccessful-join-seed-nodes = 20s +akka.coordinated-shutdown.terminate-actor-system = on +``` + +If you don't configure seed nodes or use `joinSeedNodes` you need to join the cluster manually, which can be performed by using [JMX](#cluster-jmx) or [HTTP](#cluster-http). + +You can join to any node in the cluster. It does not have to be configured as a seed node. +Note that you can only join to an existing cluster member, which means that for bootstrapping some +node must join itself,and then the following nodes could join them to make up a cluster. + An actor system can only join a cluster once. Additional attempts will be ignored. When it has successfully joined it must be restarted to be able to join another cluster or to join the same cluster again. It can use the same host name and port