Add Lightbend's SBR to Akka Cluster, #29085 (#29099)

* change package name to akka.cluster.sbr
* reference.conf has same config paths
* akka.cluster.sbr.SplitBrainResolverProvider instead of com.lightbend.akka.sbr.SplitBrainResolverProvider
* dependency from akka-cluster to akka-coordination, for lease strategy
* move TestLease to akka-coordination and use that in SBR tests
* remove keep-referee strategy
* use keep-majority by default
* review and adjust reference documentation

Co-authored-by: Johan Andrén <johan@markatta.com>
Co-authored-by: Johannes Rudolph <johannes.rudolph@gmail.com>
Co-authored-by: Christopher Batey <christopher.batey@gmail.com>
Co-authored-by: Arnout Engelen <github@bzzt.net>
This commit is contained in:
Patrik Nordwall 2020-05-25 12:21:13 +02:00 committed by GitHub
parent e0586e546c
commit c45e6ef39b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 5612 additions and 67 deletions

View file

@ -0,0 +1,97 @@
/*
* Copyright (C) 2015-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.sharding.ShardRegion
import akka.serialization.jackson.CborSerializable
object GlobalRegistry {
final case class Register(key: String, address: Address) extends CborSerializable
final case class Unregister(key: String, address: Address) extends CborSerializable
final case class DoubleRegister(key: String, msg: String) extends CborSerializable
def props(probe: ActorRef, onlyErrors: Boolean): Props =
Props(new GlobalRegistry(probe, onlyErrors))
object SingletonActor {
def props(registry: ActorRef): Props =
Props(new SingletonActor(registry))
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id)
}
val extractShardId: ShardRegion.ExtractShardId = msg =>
msg match {
case id: Int => (id % 10).toString
}
}
class SingletonActor(registry: ActorRef) extends Actor with ActorLogging {
val key = self.path.toStringWithoutAddress + "-" + Cluster(context.system).selfDataCenter
override def preStart(): Unit = {
log.info("Starting")
registry ! Register(key, Cluster(context.system).selfAddress)
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
// don't call postStop
}
override def postStop(): Unit = {
log.info("Stopping")
registry ! Unregister(key, Cluster(context.system).selfAddress)
}
override def receive = {
case i: Int => sender() ! i
}
}
}
class GlobalRegistry(probe: ActorRef, onlyErrors: Boolean) extends Actor with ActorLogging {
import GlobalRegistry._
var registry = Map.empty[String, Address]
var unregisterTimestamp = Map.empty[String, Long]
override def receive = {
case r @ Register(key, address) =>
log.info("{}", r)
if (registry.contains(key)) {
val errMsg = s"trying to register $address, but ${registry(key)} was already registered for $key"
log.error(errMsg)
probe ! DoubleRegister(key, errMsg)
} else {
unregisterTimestamp.get(key).foreach { t =>
log.info("Unregister/register margin for [{}] was [{}] ms", key, (System.nanoTime() - t).nanos.toMillis)
}
registry += key -> address
if (!onlyErrors) probe ! r
}
case u @ Unregister(key, address) =>
log.info("{}", u)
if (!registry.contains(key))
probe ! s"$key was not registered"
else if (registry(key) != address)
probe ! s"${registry(key)} instead of $address was registered for $key"
else {
registry -= key
unregisterTimestamp += key -> System.nanoTime()
if (!onlyErrors) probe ! u
}
}
}

View file

@ -0,0 +1,60 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import akka.pattern.pipe
import akka.remote.RemoteActorRefProvider
import akka.remote.transport.ThrottlerTransportAdapter.Blackhole
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle
import akka.remote.transport.ThrottlerTransportAdapter.Unthrottled
import akka.serialization.jackson.CborSerializable
object GremlinController {
final case class BlackholeNode(target: Address) extends CborSerializable
final case class PassThroughNode(target: Address) extends CborSerializable
case object GetAddress extends CborSerializable
def props: Props =
Props(new GremlinController)
}
class GremlinController extends Actor with ActorLogging {
import context.dispatcher
import GremlinController._
val transport =
context.system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
val selfAddress = Cluster(context.system).selfAddress
override def receive = {
case GetAddress =>
sender() ! selfAddress
case BlackholeNode(target) =>
log.debug("Blackhole {} <-> {}", selfAddress, target)
transport.managementCommand(SetThrottle(target, Direction.Both, Blackhole)).pipeTo(sender())
case PassThroughNode(target) =>
log.debug("PassThrough {} <-> {}", selfAddress, target)
transport.managementCommand(SetThrottle(target, Direction.Both, Unthrottled)).pipeTo(sender())
}
}
object GremlinControllerProxy {
def props(target: ActorRef): Props =
Props(new GremlinControllerProxy(target))
}
class GremlinControllerProxy(target: ActorRef) extends Actor {
override def receive = {
case msg => target.forward(msg)
}
}

View file

@ -0,0 +1,409 @@
/*
* Copyright (C) 2015-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterEach
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.singleton.ClusterSingletonManager
import akka.cluster.singleton.ClusterSingletonManagerSettings
import akka.pattern.ask
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import akka.testkit.LongRunningTest
import akka.testkit.TestKit
import akka.testkit.TestProbe
import akka.util.Timeout
/*
* Depends on akka private classes so needs to be in this package
*/
object RandomizedSplitBrainResolverIntegrationSpec extends MultiNodeConfig {
val node1 = role("node1")
val node2 = role("node2")
val node3 = role("node3")
val node4 = role("node4")
val node5 = role("node5")
val node6 = role("node6")
val node7 = role("node7")
val node8 = role("node8")
val node9 = role("node9")
commonConfig(ConfigFactory.parseString(s"""
akka {
loglevel = INFO
cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver {
stable-after = 10s
active-strategy = lease-majority
lease-majority {
lease-implementation = test-lease
}
}
#failure-detector.acceptable-heartbeat-pause = 10s
# speedup timeout
sharding.handoff-timeout = 10 s
# this is starting singleton more aggressively than default (15)
singleton.min-number-of-hand-over-retries = 10
}
actor.provider = cluster
}
test-lease {
lease-class = akka.cluster.sbr.SbrTestLeaseActorClient
heartbeat-interval = 1s
heartbeat-timeout = 120s
lease-operation-timeout = 3s
}
test.random-seed = ${System.currentTimeMillis()}
akka.testconductor.barrier-timeout = 120 s
akka.cluster.run-coordinated-shutdown-when-down = off
"""))
testTransport(on = true)
}
class RandomizedSplitBrainResolverIntegrationSpecMultiJvmNode1 extends RandomizedSplitBrainResolverIntegrationSpec
class RandomizedSplitBrainResolverIntegrationSpecMultiJvmNode2 extends RandomizedSplitBrainResolverIntegrationSpec
class RandomizedSplitBrainResolverIntegrationSpecMultiJvmNode3 extends RandomizedSplitBrainResolverIntegrationSpec
class RandomizedSplitBrainResolverIntegrationSpecMultiJvmNode4 extends RandomizedSplitBrainResolverIntegrationSpec
class RandomizedSplitBrainResolverIntegrationSpecMultiJvmNode5 extends RandomizedSplitBrainResolverIntegrationSpec
class RandomizedSplitBrainResolverIntegrationSpecMultiJvmNode6 extends RandomizedSplitBrainResolverIntegrationSpec
class RandomizedSplitBrainResolverIntegrationSpecMultiJvmNode7 extends RandomizedSplitBrainResolverIntegrationSpec
class RandomizedSplitBrainResolverIntegrationSpecMultiJvmNode8 extends RandomizedSplitBrainResolverIntegrationSpec
class RandomizedSplitBrainResolverIntegrationSpecMultiJvmNode9 extends RandomizedSplitBrainResolverIntegrationSpec
class RandomizedSplitBrainResolverIntegrationSpec
extends MultiNodeSpec(RandomizedSplitBrainResolverIntegrationSpec)
with MultiNodeClusterSpec
with ImplicitSender
with BeforeAndAfterEach {
import GlobalRegistry._
import GremlinController._
import RandomizedSplitBrainResolverIntegrationSpec._
// counter for unique naming for each test
var c = 0
// to be shutdown in afterEach
var disposableSys: DisposableSys = _
override def expectedTestDuration = 3.minutes
object DisposableSys {
def apply(scenario: Scenario): DisposableSys = {
disposableSys = new DisposableSys(scenario)
disposableSys
}
}
override def afterEach(): Unit = {
if (disposableSys ne null)
disposableSys.shutdownSys()
}
class DisposableSys(scenario: Scenario) {
c += 1
val sys: ActorSystem = {
val sys = ActorSystem(system.name + "-" + c, system.settings.config)
val gremlinController = sys.actorOf(GremlinController.props, "gremlinController")
system.actorOf(GremlinControllerProxy.props(gremlinController), s"gremlinControllerProxy-$c")
sys
}
val singletonProbe = TestProbe()
val shardingProbe = TestProbe()
runOn(node1) {
system.actorOf(GlobalRegistry.props(singletonProbe.ref, true), s"singletonRegistry-$c")
system.actorOf(GlobalRegistry.props(shardingProbe.ref, true), s"shardingRegistry-$c")
if (scenario.usingLease)
system.actorOf(SbrTestLeaseActor.props, s"lease-${sys.name}")
}
enterBarrier("registry-started")
system.actorSelection(node(node1) / "user" / s"singletonRegistry-$c") ! Identify(None)
val singletonRegistry: ActorRef = expectMsgType[ActorIdentity].ref.get
system.actorSelection(node(node1) / "user" / s"shardingRegistry-$c") ! Identify(None)
val shardingRegistry: ActorRef = expectMsgType[ActorIdentity].ref.get
if (scenario.usingLease) {
system.actorSelection(node(node1) / "user" / s"lease-${sys.name}") ! Identify(None)
val leaseRef: ActorRef = expectMsgType[ActorIdentity].ref.get
SbrTestLeaseActorClientExt(sys).getActorLeaseClient().setActorLeaseRef(leaseRef)
}
enterBarrier("registry-located")
lazy val region = ClusterSharding(sys).shardRegion(s"Entity-$c")
def shutdownSys(): Unit = {
TestKit.shutdownActorSystem(sys, 10.seconds, verifySystemShutdown = true)
}
def gremlinControllerProxy(at: RoleName): ActorRef = {
system.actorSelection(node(at) / "user" / s"gremlinControllerProxy-$c") ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
def sysAddress(at: RoleName): Address = {
implicit val timeout = Timeout(3.seconds)
Await.result((gremlinControllerProxy(at) ? GetAddress).mapTo[Address], timeout.duration)
}
def blackhole(from: RoleName, to: RoleName): Unit = {
implicit val timeout = Timeout(3.seconds)
import system.dispatcher
val f = for {
target <- (gremlinControllerProxy(to) ? GetAddress).mapTo[Address]
done <- gremlinControllerProxy(from) ? BlackholeNode(target)
} yield done
Await.ready(f, timeout.duration * 2)
log.info("Blackhole {} <-> {}", from.name, to.name)
}
def passThrough(from: RoleName, to: RoleName): Unit = {
implicit val timeout = Timeout(3.seconds)
import system.dispatcher
val f = for {
target <- (gremlinControllerProxy(to) ? GetAddress).mapTo[Address]
done <- gremlinControllerProxy(from) ? PassThroughNode(target)
} yield done
Await.ready(f, timeout.duration * 2)
log.info("PassThrough {} <-> {}", from.name, to.name)
}
def join(from: RoleName, to: RoleName, awaitUp: Boolean): Unit = {
runOn(from) {
Cluster(sys).join(sysAddress(to))
createSingleton()
startSharding()
if (awaitUp)
awaitMemberUp()
}
enterBarrier(from.name + s"-joined-$c")
}
def awaitMemberUp(): Unit =
within(10.seconds) {
awaitAssert {
Cluster(sys).state.members.exists { m =>
m.address == Cluster(sys).selfAddress && m.status == MemberStatus.Up
} should be(true)
}
}
def createSingleton(): ActorRef = {
sys.actorOf(
ClusterSingletonManager.props(
singletonProps = SingletonActor.props(singletonRegistry),
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(system)),
name = "singletonRegistry")
}
def startSharding(): Unit = {
ClusterSharding(sys).start(
typeName = s"Entity-$c",
entityProps = SingletonActor.props(shardingRegistry),
settings = ClusterShardingSettings(system),
extractEntityId = SingletonActor.extractEntityId,
extractShardId = SingletonActor.extractShardId)
}
def verify(): Unit = {
val nodes = roles.take(scenario.numberOfNodes)
def sendToSharding(expectReply: Boolean): Unit = {
runOn(nodes: _*) {
if (!Cluster(sys).isTerminated) {
val probe = TestProbe()(sys)
for (i <- 0 until 10) {
region.tell(i, probe.ref)
if (expectReply)
probe.expectMsg(3.seconds, i)
}
}
}
}
runOn(nodes: _*) {
log.info("Running {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
}
val randomSeed = sys.settings.config.getLong("test.random-seed")
val random = new Random(randomSeed)
enterBarrier(s"log-startup-$c")
within(3.minutes) {
join(nodes.head, nodes.head, awaitUp = true) // oldest
join(nodes.last, nodes.head, awaitUp = true) // next oldest
for (n <- nodes.tail.dropRight(1))
join(n, nodes.head, awaitUp = false)
runOn(nodes: _*) {
awaitMemberUp()
}
enterBarrier(s"all-up-$c")
singletonProbe.expectNoMessage(1.second)
shardingProbe.expectNoMessage(10.millis)
sendToSharding(expectReply = true)
enterBarrier(s"initialized-$c")
runOn(nodes: _*) {
log.info("Initialized {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
}
runOn(node1) {
val cleanSplit = random.nextBoolean()
val healCleanSplit = cleanSplit && random.nextBoolean()
val side1 = nodes.take(1 + random.nextInt(nodes.size - 1))
val side2 = nodes.drop(side1.size)
val numberOfFlaky = random.nextInt(5)
val healLastFlay = numberOfFlaky > 0 && random.nextBoolean()
val flaky: Map[Int, (RoleName, List[RoleName])] =
(0 until numberOfFlaky).map { i =>
val from = nodes(random.nextInt(nodes.size))
val targets = nodes.filterNot(_ == from)
val to = (0 to random.nextInt(math.min(5, targets.size))).map(j => targets(j)).toList
i -> (from -> to)
}.toMap
val delays = (0 until 10).map(_ => 2 + random.nextInt(13))
log.info(s"Generated $scenario with random seed [$randomSeed] in round [$c]: " +
s"cleanSplit [$cleanSplit], healCleanSplit [$healCleanSplit] " +
(if (cleanSplit) s"side1 [${side1.map(_.name).mkString(", ")}], side2 [${side2.map(_.name).mkString(", ")}] ") +
s"flaky [${flaky.map { case (_, (from, to)) => from.name -> to.map(_.name).mkString("(", ", ", ")") }.mkString("; ")}] " +
s"delays [${delays.mkString(", ")}]")
var delayIndex = 0
def nextDelay(): Unit = {
Thread.sleep(delays(delayIndex) * 1000)
delayIndex += 1
}
if (cleanSplit) {
for (n1 <- side1; n2 <- side2)
blackhole(n1, n2)
nextDelay()
}
flaky.foreach {
case (i, (from, to)) =>
if (i != 0) {
// heal previous flakiness
val (prevFrom, prevTo) = flaky(i - 1)
for (n <- prevTo)
passThrough(prevFrom, n)
}
for (n <- to)
blackhole(from, n)
nextDelay()
}
if (healLastFlay) {
val (prevFrom, prevTo) = flaky(flaky.size - 1)
for (n <- prevTo)
passThrough(prevFrom, n)
nextDelay()
}
if (healCleanSplit) {
for (n1 <- side1; n2 <- side2)
passThrough(n1, n2)
}
}
enterBarrier(s"scenario-done-$c")
runOn(nodes: _*) {
sendToSharding(expectReply = false)
singletonProbe.expectNoMessage(10.seconds)
shardingProbe.expectNoMessage(10.millis)
var loopLimit = 20
while (loopLimit != 0 && !Cluster(sys).isTerminated && Cluster(sys).state.unreachable.nonEmpty) {
sendToSharding(expectReply = false)
singletonProbe.expectNoMessage(5.seconds)
shardingProbe.expectNoMessage(10.millis)
loopLimit -= 1
}
}
enterBarrier(s"terminated-or-unreachable-removed-$c")
runOn(nodes: _*) {
(Cluster(sys).isTerminated || Cluster(sys).state.unreachable.isEmpty) should ===(true)
within(30.seconds) {
awaitAssert {
sendToSharding(expectReply = true)
}
}
singletonProbe.expectNoMessage(5.seconds)
shardingProbe.expectNoMessage(10.millis)
if (!Cluster(sys).isTerminated)
log.info(s"Survived ${Cluster(sys).state.members.size} members in round $c")
}
enterBarrier(s"verified-$c")
}
enterBarrier(s"after-$c")
}
}
private val leaseMajorityConfig = ConfigFactory.parseString("""akka.cluster.split-brain-resolver {
active-strategy = lease-majority
}""")
case class Scenario(cfg: Config, numberOfNodes: Int) {
val activeStrategy: String = cfg.getString("akka.cluster.split-brain-resolver.active-strategy")
override def toString: String =
s"Scenario($activeStrategy, $numberOfNodes)"
def usingLease: Boolean = activeStrategy.contains("lease")
}
val scenarios =
List(Scenario(leaseMajorityConfig, 3), Scenario(leaseMajorityConfig, 5), Scenario(leaseMajorityConfig, 9))
"SplitBrainResolver with lease" must {
for (scenario <- scenarios) {
scenario.toString taggedAs LongRunningTest in {
DisposableSys(scenario).verify()
}
}
}
}

View file

@ -0,0 +1,125 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.Props
import akka.coordination.lease.LeaseSettings
import akka.coordination.lease.scaladsl.Lease
import akka.pattern.ask
import akka.serialization.jackson.CborSerializable
import akka.util.Timeout
object SbrTestLeaseActor {
def props: Props =
Props(new SbrTestLeaseActor)
final case class Acquire(owner: String) extends CborSerializable
final case class Release(owner: String) extends CborSerializable
}
class SbrTestLeaseActor extends Actor with ActorLogging {
import SbrTestLeaseActor._
var owner: Option[String] = None
override def receive = {
case Acquire(o) =>
owner match {
case None =>
log.info("ActorLease: acquired by [{}]", o)
owner = Some(o)
sender() ! true
case Some(`o`) =>
log.info("ActorLease: renewed by [{}]", o)
sender() ! true
case Some(existingOwner) =>
log.info("ActorLease: requested by [{}], but already held by [{}]", o, existingOwner)
sender() ! false
}
case Release(o) =>
owner match {
case None =>
log.info("ActorLease: released by [{}] but no owner", o)
owner = Some(o)
sender() ! true
case Some(`o`) =>
log.info("ActorLease: released by [{}]", o)
sender() ! true
case Some(existingOwner) =>
log.info("ActorLease: release attempt by [{}], but held by [{}]", o, existingOwner)
sender() ! false
}
}
}
object SbrTestLeaseActorClientExt extends ExtensionId[SbrTestLeaseActorClientExt] with ExtensionIdProvider {
override def get(system: ActorSystem): SbrTestLeaseActorClientExt = super.get(system)
override def lookup = SbrTestLeaseActorClientExt
override def createExtension(system: ExtendedActorSystem): SbrTestLeaseActorClientExt =
new SbrTestLeaseActorClientExt(system)
}
class SbrTestLeaseActorClientExt(val system: ExtendedActorSystem) extends Extension {
private val leaseClient = new AtomicReference[SbrTestLeaseActorClient]()
def getActorLeaseClient(): SbrTestLeaseActorClient = {
val lease = leaseClient.get
if (lease == null) throw new IllegalStateException("ActorLeaseClient must be set first")
lease
}
def setActorLeaseClient(client: SbrTestLeaseActorClient): Unit =
leaseClient.set(client)
}
class SbrTestLeaseActorClient(settings: LeaseSettings, system: ExtendedActorSystem) extends Lease(settings) {
import SbrTestLeaseActor.Acquire
import SbrTestLeaseActor.Release
SbrTestLeaseActorClientExt(system).setActorLeaseClient(this)
private implicit val timeout = Timeout(3.seconds)
private val _leaseRef = new AtomicReference[ActorRef]
private def leaseRef: ActorRef = {
val ref = _leaseRef.get
if (ref == null) throw new IllegalStateException("ActorLeaseRef must be set first")
ref
}
def setActorLeaseRef(ref: ActorRef): Unit =
_leaseRef.set(ref)
override def acquire(): Future[Boolean] = {
(leaseRef ? Acquire(settings.ownerName)).mapTo[Boolean]
}
override def acquire(leaseLostCallback: Option[Throwable] => Unit): Future[Boolean] =
acquire()
override def release(): Future[Boolean] = {
(leaseRef ? Release(settings.ownerName)).mapTo[Boolean]
}
override def checkLease(): Boolean = false
}

View file

@ -0,0 +1,465 @@
/*
* Copyright (C) 2015-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigValueFactory
import org.scalatest.BeforeAndAfterEach
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.ClusterSettings.DefaultDataCenter
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.singleton.ClusterSingletonManager
import akka.cluster.singleton.ClusterSingletonManagerSettings
import akka.pattern.ask
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import akka.testkit.LongRunningTest
import akka.testkit.TestKit
import akka.testkit.TestProbe
import akka.util.Timeout
/*
* Depends on akka private classes so needs to be in this package
*/
object SplitBrainResolverIntegrationSpec extends MultiNodeConfig {
val node1 = role("node1")
val node2 = role("node2")
val node3 = role("node3")
val node4 = role("node4")
val node5 = role("node5")
val node6 = role("node6")
val node7 = role("node7")
val node8 = role("node8")
val node9 = role("node9")
commonConfig(ConfigFactory.parseString("""
akka {
loglevel = INFO
cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver.active-strategy = keep-majority
split-brain-resolver.stable-after = 10s
sharding.handoff-timeout = 5s
}
actor.provider = cluster
remote.log-remote-lifecycle-events = off
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off
"""))
testTransport(on = true)
}
class SplitBrainResolverIntegrationSpecMultiJvmNode1 extends SplitBrainResolverIntegrationSpec
class SplitBrainResolverIntegrationSpecMultiJvmNode2 extends SplitBrainResolverIntegrationSpec
class SplitBrainResolverIntegrationSpecMultiJvmNode3 extends SplitBrainResolverIntegrationSpec
class SplitBrainResolverIntegrationSpecMultiJvmNode4 extends SplitBrainResolverIntegrationSpec
class SplitBrainResolverIntegrationSpecMultiJvmNode5 extends SplitBrainResolverIntegrationSpec
class SplitBrainResolverIntegrationSpecMultiJvmNode6 extends SplitBrainResolverIntegrationSpec
class SplitBrainResolverIntegrationSpecMultiJvmNode7 extends SplitBrainResolverIntegrationSpec
class SplitBrainResolverIntegrationSpecMultiJvmNode8 extends SplitBrainResolverIntegrationSpec
class SplitBrainResolverIntegrationSpecMultiJvmNode9 extends SplitBrainResolverIntegrationSpec
class SplitBrainResolverIntegrationSpec
extends MultiNodeSpec(SplitBrainResolverIntegrationSpec)
with MultiNodeClusterSpec
with ImplicitSender
with BeforeAndAfterEach {
import GlobalRegistry._
import GremlinController._
import SplitBrainResolverIntegrationSpec._
override def initialParticipants = roles.size
override def afterEach(): Unit = {
if (disposableSys ne null)
disposableSys.shutdownSys()
}
// counter for unique naming for each test
var c = 0
// to be shutdown in afterEach
var disposableSys: DisposableSys = _
override def expectedTestDuration = 10.minutes
object DisposableSys {
def apply(scenario: Scenario): DisposableSys = {
disposableSys = new DisposableSys(scenario)
disposableSys
}
}
class DisposableSys(scenario: Scenario) {
c += 1
val sys: ActorSystem = {
val dcName = scenario.dcDecider(myself)
val sys = ActorSystem(
system.name + "-" + c,
scenario.cfg
.withValue("akka.cluster.multi-data-center.self-data-center", ConfigValueFactory.fromAnyRef(dcName))
.withFallback(system.settings.config))
val gremlinController = sys.actorOf(GremlinController.props, "gremlinController")
system.actorOf(GremlinControllerProxy.props(gremlinController), s"gremlinControllerProxy-$c")
sys
}
val singletonProbe = TestProbe()
val shardingProbe = TestProbe()
runOn(node1) {
system.actorOf(GlobalRegistry.props(singletonProbe.ref, false), s"singletonRegistry-$c")
system.actorOf(GlobalRegistry.props(shardingProbe.ref, true), s"shardingRegistry-$c")
if (scenario.usingLease)
system.actorOf(SbrTestLeaseActor.props, s"lease-${sys.name}")
}
enterBarrier("registry-started")
system.actorSelection(node(node1) / "user" / s"singletonRegistry-$c") ! Identify(None)
val singletonRegistry: ActorRef = expectMsgType[ActorIdentity].ref.get
system.actorSelection(node(node1) / "user" / s"shardingRegistry-$c") ! Identify(None)
val shardingRegistry: ActorRef = expectMsgType[ActorIdentity].ref.get
if (scenario.usingLease) {
system.actorSelection(node(node1) / "user" / s"lease-${sys.name}") ! Identify(None)
val leaseRef: ActorRef = expectMsgType[ActorIdentity].ref.get
SbrTestLeaseActorClientExt(sys).getActorLeaseClient().setActorLeaseRef(leaseRef)
}
enterBarrier("registry-located")
lazy val region = ClusterSharding(sys).shardRegion(s"Entity-$c")
def shutdownSys(): Unit = {
TestKit.shutdownActorSystem(sys, 10.seconds, verifySystemShutdown = true)
}
def gremlinControllerProxy(at: RoleName): ActorRef = {
system.actorSelection(node(at) / "user" / s"gremlinControllerProxy-$c") ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
def sysAddress(at: RoleName): Address = {
implicit val timeout = Timeout(3.seconds)
Await.result((gremlinControllerProxy(at) ? GetAddress).mapTo[Address], timeout.duration)
}
def blackhole(from: RoleName, to: RoleName): Unit = {
implicit val timeout = Timeout(3.seconds)
import system.dispatcher
val f = for {
target <- (gremlinControllerProxy(to) ? GetAddress).mapTo[Address]
done <- gremlinControllerProxy(from) ? BlackholeNode(target)
} yield done
Await.ready(f, timeout.duration * 2)
log.info("Blackhole {} <-> {}", from.name, to.name)
}
def join(from: RoleName, to: RoleName, awaitUp: Boolean): Unit = {
runOn(from) {
Cluster(sys).join(sysAddress(to))
createSingleton()
startSharding()
if (awaitUp)
awaitMemberUp()
}
enterBarrier(from.name + s"-joined-$c")
}
def awaitMemberUp(): Unit =
within(10.seconds) {
awaitAssert {
Cluster(sys).state.members.exists { m =>
m.address == Cluster(sys).selfAddress && m.status == MemberStatus.Up
} should be(true)
}
}
def awaitAllMembersUp(nodes: RoleName*): Unit = {
val addresses = nodes.map(sysAddress).toSet
within(15.seconds) {
awaitAssert {
Cluster(sys).state.members.map(_.address) should ===(addresses)
Cluster(sys).state.members.foreach {
_.status should ===(MemberStatus.Up)
}
}
}
}
def createSingleton(): ActorRef = {
sys.actorOf(
ClusterSingletonManager.props(
singletonProps = SingletonActor.props(singletonRegistry),
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(system)),
name = "singletonRegistry")
}
def startSharding(): Unit = {
ClusterSharding(sys).start(
typeName = s"Entity-$c",
entityProps = SingletonActor.props(shardingRegistry),
settings = ClusterShardingSettings(system),
extractEntityId = SingletonActor.extractEntityId,
extractShardId = SingletonActor.extractShardId)
}
def verify(): Unit = {
val side1 = roles.take(scenario.side1Size)
val side2 = roles.drop(scenario.side1Size).take(scenario.side2Size)
def singletonRegisterKey(node: RoleName): String =
"/user/singletonRegistry/singleton-" + scenario.dcDecider(node)
runOn(side1 ++ side2: _*) {
log.info("Running {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
}
enterBarrier(s"log-startup-$c")
within(90.seconds) {
join(side1.head, side1.head, awaitUp = true) // oldest
join(side2.head, side1.head, awaitUp = true) // next oldest
for (n <- side1.tail ++ side2.tail)
join(n, side1.head, awaitUp = false)
runOn(side1 ++ side2: _*) {
awaitAllMembersUp(side1 ++ side2: _*)
}
enterBarrier(s"all-up-$c")
runOn(node1) {
singletonProbe.within(25.seconds) {
singletonProbe.expectMsg(Register(singletonRegisterKey(node1), sysAddress(node1)))
}
shardingProbe.expectNoMessage(100.millis)
}
runOn(side1 ++ side2: _*) {
val probe = TestProbe()(sys)
for (i <- 0 until 10) {
region.tell(i, probe.ref)
probe.expectMsg(5.seconds, i)
}
}
enterBarrier(s"initialized-$c")
runOn(side1 ++ side2: _*) {
log.info("Initialized {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
}
runOn(node1) {
for (n1 <- side1; n2 <- side2)
blackhole(n1, n2)
}
enterBarrier(s"blackhole-$c")
val resolvedExpected = scenario.expected match {
case KeepLeader =>
import Member.addressOrdering
val address = (side1 ++ side2).map(sysAddress).min
if (side1.exists(sysAddress(_) == address)) KeepSide1
else if (side2.exists(sysAddress(_) == address)) KeepSide2
else ShutdownBoth
case other => other
}
resolvedExpected match {
case ShutdownBoth =>
runOn(side1 ++ side2: _*) {
awaitCond(Cluster(sys).isTerminated, max = 30.seconds)
}
enterBarrier(s"sys-terminated-$c")
runOn(node1) {
singletonProbe.within(20.seconds) {
singletonProbe.expectMsg(Unregister(singletonRegisterKey(side1.head), sysAddress(side1.head)))
}
shardingProbe.expectNoMessage(100.millis)
}
case KeepSide1 =>
runOn(side1: _*) {
val expectedAddresses = side1.map(sysAddress).toSet
within(remaining - 3.seconds) {
awaitAssert {
val probe = TestProbe()(sys)
for (i <- 0 until 10) {
region.tell(i, probe.ref)
probe.expectMsg(2.seconds, i)
}
Cluster(sys).state.members.map(_.address) should be(expectedAddresses)
}
}
}
runOn(side2: _*) {
awaitCond(Cluster(sys).isTerminated, max = 30.seconds)
}
enterBarrier(s"cluster-shutdown-verified-$c")
singletonProbe.expectNoMessage(1.second)
shardingProbe.expectNoMessage(100.millis)
case KeepSide2 =>
runOn(side1: _*) {
awaitCond(Cluster(sys).isTerminated, max = 30.seconds)
}
enterBarrier(s"sys-terminated-$c")
runOn(node1) {
singletonProbe.within(30.seconds) {
singletonProbe.expectMsg(Unregister(singletonRegisterKey(side1.head), sysAddress(side1.head)))
singletonProbe.expectMsg(Register(singletonRegisterKey(side2.head), sysAddress(side2.head)))
}
shardingProbe.expectNoMessage(100.millis)
}
runOn(side2: _*) {
val expectedAddresses = side2.map(sysAddress).toSet
within(remaining - 3.seconds) {
awaitAssert {
val probe = TestProbe()(sys)
for (i <- 0 until 10) {
region.tell(i, probe.ref)
probe.expectMsg(2.seconds, i)
}
Cluster(sys).state.members.map(_.address) should be(expectedAddresses)
}
}
}
case KeepAll =>
runOn((side1 ++ side2): _*) {
val expectedAddresses = (side1 ++ side2).map(sysAddress).toSet
within(remaining - 3.seconds) {
awaitAssert {
val probe = TestProbe()(sys)
for (i <- 0 until 10) {
region.tell(i, probe.ref)
probe.expectMsg(2.seconds, i)
}
Cluster(sys).state.members.map(_.address) should be(expectedAddresses)
}
}
Cluster(sys).isTerminated should be(false)
}
enterBarrier(s"cluster-intact-verified-$c")
case KeepLeader => throw new IllegalStateException // already resolved to other case
}
enterBarrier(s"verified-$c")
}
enterBarrier(s"after-$c")
}
}
private val staticQuorumConfig = ConfigFactory.parseString("""akka.cluster.split-brain-resolver {
active-strategy = static-quorum
static-quorum.quorum-size = 5
}""")
private val keepMajorityConfig = ConfigFactory.parseString("""akka.cluster.split-brain-resolver {
active-strategy = keep-majority
}""")
private val keepOldestConfig = ConfigFactory.parseString("""akka.cluster.split-brain-resolver {
active-strategy = keep-oldest
}""")
private val downAllConfig = ConfigFactory.parseString("""akka.cluster.split-brain-resolver {
active-strategy = down-all
}""")
private val leaseMajorityConfig = ConfigFactory.parseString("""akka.cluster.split-brain-resolver {
active-strategy = lease-majority
lease-majority {
lease-implementation = test-lease
acquire-lease-delay-for-minority = 3s
}
}
test-lease {
lease-class = akka.cluster.sbr.SbrTestLeaseActorClient
heartbeat-interval = 1s
heartbeat-timeout = 120s
lease-operation-timeout = 3s
}
""")
sealed trait Expected
case object KeepSide1 extends Expected
case object KeepSide2 extends Expected
case object ShutdownBoth extends Expected
case object KeepLeader extends Expected
case object KeepAll extends Expected
val defaultDcDecider: RoleName => DataCenter = _ => DefaultDataCenter
case class Scenario(
cfg: Config,
side1Size: Int,
side2Size: Int,
expected: Expected,
dcDecider: RoleName => DataCenter = defaultDcDecider // allows to set the dc per indexed node
) {
val activeStrategy: String = cfg.getString("akka.cluster.split-brain-resolver.active-strategy")
override def toString: String = {
s"$expected when using $activeStrategy and side1=$side1Size and side2=$side2Size" +
(if (dcDecider ne defaultDcDecider) "with multi-DC" else "")
}
def usingLease: Boolean = activeStrategy.contains("lease")
}
val scenarios = List(
Scenario(staticQuorumConfig, 1, 2, ShutdownBoth),
Scenario(staticQuorumConfig, 4, 4, ShutdownBoth),
Scenario(staticQuorumConfig, 5, 4, KeepSide1),
Scenario(staticQuorumConfig, 1, 5, KeepSide2),
Scenario(staticQuorumConfig, 4, 5, KeepSide2),
Scenario(keepMajorityConfig, 2, 1, KeepSide1),
Scenario(keepMajorityConfig, 1, 2, KeepSide2),
Scenario(keepMajorityConfig, 4, 5, KeepSide2),
Scenario(keepMajorityConfig, 4, 4, KeepLeader),
Scenario(keepOldestConfig, 3, 3, KeepSide1),
Scenario(keepOldestConfig, 1, 1, KeepSide1),
Scenario(keepOldestConfig, 1, 2, KeepSide2), // because down-if-alone
Scenario(keepMajorityConfig, 3, 2, KeepAll, {
case `node1` | `node2` | `node3` => "dcA"
case _ => "dcB"
}),
Scenario(downAllConfig, 1, 2, ShutdownBoth),
Scenario(leaseMajorityConfig, 4, 5, KeepSide2))
"Cluster SplitBrainResolver" must {
for (scenario <- scenarios) {
scenario.toString taggedAs LongRunningTest in {
DisposableSys(scenario).verify()
}
}
}
}

View file

@ -11,7 +11,9 @@ import scala.util.control.NoStackTrace
import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.Props
import akka.cluster.{ Cluster, MemberStatus, TestLease, TestLeaseExt }
import akka.cluster.{ Cluster, MemberStatus }
import akka.coordination.lease.TestLease
import akka.coordination.lease.TestLeaseExt
import akka.testkit.{ AkkaSpec, ImplicitSender }
import akka.testkit.TestActors.EchoActor

View file

@ -12,20 +12,21 @@ import scala.util.Success
import scala.util.control.NoStackTrace
import akka.actor.{ Actor, ActorLogging, PoisonPill, Props }
import akka.cluster.TestLeaseExt
import akka.cluster.sharding.ShardRegion.ShardInitialized
import akka.coordination.lease.LeaseUsageSettings
import akka.coordination.lease.TestLease
import akka.coordination.lease.TestLeaseExt
import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe }
object ShardSpec {
val config =
"""
s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.classic.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
test-lease {
lease-class = akka.cluster.TestLease
lease-class = ${classOf[TestLease].getName}
heartbeat-interval = 1s
heartbeat-timeout = 120s
lease-operation-timeout = 3s

View file

@ -11,8 +11,10 @@ import com.typesafe.config.ConfigFactory
import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, Address, Identify, PoisonPill, Props }
import akka.cluster._
import akka.cluster.MemberStatus.Up
import akka.cluster.TestLeaseActor._
import akka.cluster.singleton.ClusterSingletonManagerLeaseSpec.ImportantSingleton.Response
import akka.coordination.lease.TestLeaseActor
import akka.coordination.lease.TestLeaseActorClient
import akka.coordination.lease.TestLeaseActorClientExt
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit._
@ -25,14 +27,14 @@ object ClusterSingletonManagerLeaseSpec extends MultiNodeConfig {
testTransport(true)
commonConfig(ConfigFactory.parseString("""
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s
test-lease {
lease-class = akka.cluster.TestLeaseActorClient
lease-class = ${classOf[TestLeaseActorClient].getName}
heartbeat-interval = 1s
heartbeat-timeout = 120s
lease-operation-timeout = 3s
@ -79,6 +81,7 @@ class ClusterSingletonManagerLeaseSpec
import ClusterSingletonManagerLeaseSpec._
import ClusterSingletonManagerLeaseSpec.ImportantSingleton._
import TestLeaseActor._
override def initialParticipants = roles.size
@ -128,10 +131,11 @@ class ClusterSingletonManagerLeaseSpec
}
"Start singleton and ping from all nodes" in {
runOn(first, second, third, fourth) {
// fourth doesn't have the worker role
runOn(first, second, third) {
system.actorOf(
ClusterSingletonManager
.props(props(), PoisonPill, ClusterSingletonManagerSettings(system).withRole("worker")),
.props(ImportantSingleton.props(), PoisonPill, ClusterSingletonManagerSettings(system).withRole("worker")),
"important")
}
enterBarrier("singleton-started")

View file

@ -20,10 +20,8 @@ import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.cluster.TestLease
import akka.cluster.TestLease.AcquireReq
import akka.cluster.TestLease.ReleaseReq
import akka.cluster.TestLeaseExt
import akka.coordination.lease.TestLease
import akka.coordination.lease.TestLeaseExt
import akka.testkit.AkkaSpec
import akka.testkit.TestException
import akka.testkit.TestProbe
@ -55,6 +53,7 @@ class ClusterSingletonLeaseSpec extends AkkaSpec(ConfigFactory.parseString("""
lease-retry-interval = 2000ms
}
""").withFallback(TestLease.config)) {
import TestLease.{ AcquireReq, ReleaseReq }
val cluster = Cluster(system)
val testLeaseExt = TestLeaseExt(system)

View file

@ -41,6 +41,8 @@ akka {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551",
"akka://ClusterSystem@127.0.0.1:2552"]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
#config-seeds

View file

@ -42,6 +42,10 @@ akka {
# This is useful if you implement downing strategies that handle network partitions,
# e.g. by keeping the larger side of the partition and shutting down the smaller side.
# Disable with "off" or specify a duration to enable.
#
# When using the `akka.cluster.sbr.SplitBrainResolver` as downing provider it will use
# the akka.cluster.split-brain-resolver.stable-after as the default down-removal-margin
# if this down-removal-margin is undefined.
down-removal-margin = off
# Pluggable support for downing of nodes in the cluster.
@ -364,3 +368,113 @@ akka {
}
}
#//#split-brain-resolver
# To enable the split brain resolver you first need to enable the provider in your application.conf:
# akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
akka.cluster.split-brain-resolver {
# Select one of the available strategies (see descriptions below):
# static-quorum, keep-majority, keep-oldest, down-all, lease-majority
active-strategy = keep-majority
#//#stable-after
# Time margin after which shards or singletons that belonged to a downed/removed
# partition are created in surviving partition. The purpose of this margin is that
# in case of a network partition the persistent actors in the non-surviving partitions
# must be stopped before corresponding persistent actors are started somewhere else.
# This is useful if you implement downing strategies that handle network partitions,
# e.g. by keeping the larger side of the partition and shutting down the smaller side.
# Decision is taken by the strategy when there has been no membership or
# reachability changes for this duration, i.e. the cluster state is stable.
stable-after = 20s
#//#stable-after
# When reachability observations by the failure detector are changed the SBR decisions
# are deferred until there are no changes within the 'stable-after' duration.
# If this continues for too long it might be an indication of an unstable system/network
# and it could result in delayed or conflicting decisions on separate sides of a network
# partition.
# As a precaution for that scenario all nodes are downed if no decision is made within
# `stable-after + down-all-when-unstable` from the first unreachability event.
# The measurement is reset if all unreachable have been healed, downed or removed, or
# if there are no changes within `stable-after * 2`.
# The value can be on, off, or a duration.
# By default it is 'on' and then it is derived to be 3/4 of stable-after.
down-all-when-unstable = on
}
#//#split-brain-resolver
# Down the unreachable nodes if the number of remaining nodes are greater than or equal to
# the given 'quorum-size'. Otherwise down the reachable nodes, i.e. it will shut down that
# side of the partition. In other words, the 'size' defines the minimum number of nodes
# that the cluster must have to be operational. If there are unreachable nodes when starting
# up the cluster, before reaching this limit, the cluster may shutdown itself immediately.
# This is not an issue if you start all nodes at approximately the same time.
#
# Note that you must not add more members to the cluster than 'quorum-size * 2 - 1', because
# then both sides may down each other and thereby form two separate clusters. For example,
# quorum-size configured to 3 in a 6 node cluster may result in a split where each side
# consists of 3 nodes each, i.e. each side thinks it has enough nodes to continue by
# itself. A warning is logged if this recommendation is violated.
#//#static-quorum
akka.cluster.split-brain-resolver.static-quorum {
# minimum number of nodes that the cluster must have
quorum-size = undefined
# if the 'role' is defined the decision is based only on members with that 'role'
role = ""
}
#//#static-quorum
# Down the unreachable nodes if the current node is in the majority part based the last known
# membership information. Otherwise down the reachable nodes, i.e. the own part. If the
# the parts are of equal size the part containing the node with the lowest address is kept.
# Note that if there are more than two partitions and none is in majority each part
# will shutdown itself, terminating the whole cluster.
#//#keep-majority
akka.cluster.split-brain-resolver.keep-majority {
# if the 'role' is defined the decision is based only on members with that 'role'
role = ""
}
#//#keep-majority
# Down the part that does not contain the oldest member (current singleton).
#
# There is one exception to this rule if 'down-if-alone' is defined to 'on'.
# Then, if the oldest node has partitioned from all other nodes the oldest
# will down itself and keep all other nodes running. The strategy will not
# down the single oldest node when it is the only remaining node in the cluster.
#
# Note that if the oldest node crashes the others will remove it from the cluster
# when 'down-if-alone' is 'on', otherwise they will down themselves if the
# oldest node crashes, i.e. shutdown the whole cluster together with the oldest node.
#//#keep-oldest
akka.cluster.split-brain-resolver.keep-oldest {
# Enable downing of the oldest node when it is partitioned from all other nodes
down-if-alone = on
# if the 'role' is defined the decision is based only on members with that 'role',
# i.e. using the oldest member (singleton) within the nodes with that role
role = ""
}
#//#keep-oldest
# Keep the part that can acquire the lease, and down the other part.
# Best effort is to keep the side that has most nodes, i.e. the majority side.
# This is achieved by adding a delay before trying to acquire the lease on the
# minority side.
#//#lease-majority
akka.cluster.split-brain-resolver.lease-majority {
lease-implementation = ""
# This delay is used on the minority side before trying to acquire the lease,
# as an best effort to try to keep the majority side.
acquire-lease-delay-for-minority = 2s
# If the 'role' is defined the majority/minority is based only on members with that 'role'.
role = ""
}
#//#lease-majority

View file

@ -40,9 +40,7 @@ private[cluster] object DowningProvider {
* When implementing a downing provider you should make sure that it will not split the cluster into
* several separate clusters in case of network problems or system overload (long GC pauses). This
* is much more difficult than it might be perceived at first, so carefully read the concerns and scenarios
* described in
* https://doc.akka.io/docs/akka/current/typed/cluster.html#downing and
* https://doc.akka.io/docs/akka-enhancements/current/split-brain-resolver.html
* described in https://doc.akka.io/docs/akka/current/split-brain-resolver.html
*/
abstract class DowningProvider {

View file

@ -0,0 +1,625 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import akka.actor.Address
import akka.annotation.InternalApi
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.cluster.Reachability
import akka.cluster.UniqueAddress
import akka.coordination.lease.scaladsl.Lease
/**
* INTERNAL API
*/
@InternalApi private[sbr] object DowningStrategy {
sealed trait Decision {
def isIndirectlyConnected: Boolean
}
case object DownReachable extends Decision {
override def isIndirectlyConnected = false
}
case object DownUnreachable extends Decision {
override def isIndirectlyConnected = false
}
case object DownAll extends Decision {
override def isIndirectlyConnected = false
}
case object DownIndirectlyConnected extends Decision {
override def isIndirectlyConnected = true
}
sealed trait AcquireLeaseDecision extends Decision {
def acquireDelay: FiniteDuration
}
final case class AcquireLeaseAndDownUnreachable(acquireDelay: FiniteDuration) extends AcquireLeaseDecision {
override def isIndirectlyConnected = false
}
final case class AcquireLeaseAndDownIndirectlyConnected(acquireDelay: FiniteDuration) extends AcquireLeaseDecision {
override def isIndirectlyConnected = true
}
case object ReverseDownIndirectlyConnected extends Decision {
override def isIndirectlyConnected = true
}
}
/**
* INTERNAL API
*/
@InternalApi private[sbr] abstract class DowningStrategy(val selfDc: DataCenter) {
import DowningStrategy._
// may contain Joining and WeaklyUp
private var _unreachable: Set[UniqueAddress] = Set.empty[UniqueAddress]
def unreachable: Set[UniqueAddress] = _unreachable
def unreachable(m: Member): Boolean = _unreachable(m.uniqueAddress)
private var _reachability: Reachability = Reachability.empty
private var _seenBy: Set[Address] = Set.empty
protected def ordering: Ordering[Member] = Member.ordering
// all members in self DC, both joining and up.
private var _allMembers: immutable.SortedSet[Member] = immutable.SortedSet.empty(ordering)
def role: Option[String]
// all Joining and WeaklyUp members in self DC
def joining: immutable.SortedSet[Member] =
_allMembers.filter(m => m.status == MemberStatus.Joining || m.status == MemberStatus.WeaklyUp)
// all members in self DC, both joining and up.
def allMembersInDC: immutable.SortedSet[Member] = _allMembers
/**
* All members in self DC, but doesn't contain Joining, WeaklyUp, Down and Exiting.
*/
def members: immutable.SortedSet[Member] =
members(includingPossiblyUp = false, excludingPossiblyExiting = false)
/**
* All members in self DC, but doesn't contain Joining, WeaklyUp, Down and Exiting.
*
* When `includingPossiblyUp=true` it also includes Joining and WeaklyUp members that could have been
* changed to Up on the other side of a partition.
*
* When `excludingPossiblyExiting=true` it doesn't include Leaving members that could have been
* changed to Exiting on the other side of the partition.
*/
def members(includingPossiblyUp: Boolean, excludingPossiblyExiting: Boolean): immutable.SortedSet[Member] =
_allMembers.filterNot(
m =>
(!includingPossiblyUp && m.status == MemberStatus.Joining) ||
(!includingPossiblyUp && m.status == MemberStatus.WeaklyUp) ||
(excludingPossiblyExiting && m.status == MemberStatus.Leaving) ||
m.status == MemberStatus.Down ||
m.status == MemberStatus.Exiting)
def membersWithRole: immutable.SortedSet[Member] =
membersWithRole(includingPossiblyUp = false, excludingPossiblyExiting = false)
def membersWithRole(includingPossiblyUp: Boolean, excludingPossiblyExiting: Boolean): immutable.SortedSet[Member] =
role match {
case None => members(includingPossiblyUp, excludingPossiblyExiting)
case Some(r) => members(includingPossiblyUp, excludingPossiblyExiting).filter(_.hasRole(r))
}
def reachableMembers: immutable.SortedSet[Member] =
reachableMembers(includingPossiblyUp = false, excludingPossiblyExiting = false)
def reachableMembers(includingPossiblyUp: Boolean, excludingPossiblyExiting: Boolean): immutable.SortedSet[Member] = {
val mbrs = members(includingPossiblyUp, excludingPossiblyExiting)
if (unreachable.isEmpty) mbrs
else mbrs.filter(m => !unreachable(m))
}
def reachableMembersWithRole: immutable.SortedSet[Member] =
reachableMembersWithRole(includingPossiblyUp = false, excludingPossiblyExiting = false)
def reachableMembersWithRole(
includingPossiblyUp: Boolean,
excludingPossiblyExiting: Boolean): immutable.SortedSet[Member] =
role match {
case None => reachableMembers(includingPossiblyUp, excludingPossiblyExiting)
case Some(r) => reachableMembers(includingPossiblyUp, excludingPossiblyExiting).filter(_.hasRole(r))
}
def unreachableMembers: immutable.SortedSet[Member] =
unreachableMembers(includingPossiblyUp = false, excludingPossiblyExiting = false)
def unreachableMembers(
includingPossiblyUp: Boolean,
excludingPossiblyExiting: Boolean): immutable.SortedSet[Member] = {
if (unreachable.isEmpty) immutable.SortedSet.empty
else members(includingPossiblyUp, excludingPossiblyExiting).filter(unreachable)
}
def unreachableMembersWithRole: immutable.SortedSet[Member] =
unreachableMembersWithRole(includingPossiblyUp = false, excludingPossiblyExiting = false)
def unreachableMembersWithRole(
includingPossiblyUp: Boolean,
excludingPossiblyExiting: Boolean): immutable.SortedSet[Member] =
role match {
case None => unreachableMembers(includingPossiblyUp, excludingPossiblyExiting)
case Some(r) => unreachableMembers(includingPossiblyUp, excludingPossiblyExiting).filter(_.hasRole(r))
}
def addUnreachable(m: Member): Unit = {
require(m.dataCenter == selfDc)
add(m)
_unreachable = _unreachable + m.uniqueAddress
}
def addReachable(m: Member): Unit = {
require(m.dataCenter == selfDc)
add(m)
_unreachable = _unreachable - m.uniqueAddress
}
def add(m: Member): Unit = {
require(m.dataCenter == selfDc)
removeFromAllMembers(m)
_allMembers += m
}
def remove(m: Member): Unit = {
require(m.dataCenter == selfDc)
removeFromAllMembers(m)
_unreachable -= m.uniqueAddress
}
private def removeFromAllMembers(m: Member): Unit = {
if (ordering eq Member.ordering) {
_allMembers -= m
} else {
// must use filterNot for removals/replace in the SortedSet when
// ageOrdering is using upNumber and that will change when Joining -> Up
_allMembers = _allMembers.filterNot(_.uniqueAddress == m.uniqueAddress)
}
}
def reachability: Reachability =
_reachability
private def isInSelfDc(node: UniqueAddress): Boolean = {
_allMembers.exists(m => m.uniqueAddress == node && m.dataCenter == selfDc)
}
/**
* @return true if it was changed
*/
private[sbr] def setReachability(r: Reachability): Boolean = {
// skip records with Reachability.Reachable, and skip records related to other DC
val newReachability = r.filterRecords(
record =>
(record.status == Reachability.Unreachable || record.status == Reachability.Terminated) &&
isInSelfDc(record.observer) && isInSelfDc(record.subject))
val oldReachability = _reachability
val changed =
if (oldReachability.records.size != newReachability.records.size)
true
else
oldReachability.records.map(r => r.observer -> r.subject).toSet !=
newReachability.records.map(r => r.observer -> r.subject).toSet
_reachability = newReachability
changed
}
def seenBy: Set[Address] =
_seenBy
def setSeenBy(s: Set[Address]): Unit =
_seenBy = s
/**
* Nodes that are marked as unreachable but can communicate with gossip via a 3rd party.
*
* Cycle in unreachability graph corresponds to that some node is both
* observing another node as unreachable, and is also observed as unreachable by someone
* else.
*
* Another indication of indirectly connected nodes is if a node is marked as unreachable,
* but it has still marked current gossip state as seen.
*
* Those cases will not happen for clean splits and crashed nodes.
*/
def indirectlyConnected: Set[UniqueAddress] = {
indirectlyConnectedFromIntersectionOfObserversAndSubjects.union(indirectlyConnectedFromSeenCurrentGossip)
}
private def indirectlyConnectedFromIntersectionOfObserversAndSubjects: Set[UniqueAddress] = {
// cycle in unreachability graph
val observers = reachability.allObservers
observers.intersect(reachability.allUnreachableOrTerminated)
}
private def indirectlyConnectedFromSeenCurrentGossip: Set[UniqueAddress] = {
reachability.records.flatMap { r =>
if (seenBy(r.subject.address)) r.observer :: r.subject :: Nil
else Nil
}.toSet
}
def hasIndirectlyConnected: Boolean = indirectlyConnected.nonEmpty
def unreachableButNotIndirectlyConnected: Set[UniqueAddress] = unreachable.diff(indirectlyConnected)
def nodesToDown(decision: Decision = decide()): Set[UniqueAddress] = {
val downable = members
.union(joining)
.filterNot(m => m.status == MemberStatus.Down || m.status == MemberStatus.Exiting)
.map(_.uniqueAddress)
decision match {
case DownUnreachable | AcquireLeaseAndDownUnreachable(_) => downable.intersect(unreachable)
case DownReachable => downable.diff(unreachable)
case DownAll => downable
case DownIndirectlyConnected | AcquireLeaseAndDownIndirectlyConnected(_) =>
// Down nodes that have been marked as unreachable via some network links but they are still indirectly
// connected via other links. It will keep other "normal" nodes.
// If there is a combination of indirectly connected nodes and a clean network partition (or node crashes)
// it will combine the above decision with the ordinary decision, e.g. keep majority, after excluding
// failure detection observations between the indirectly connected nodes.
// Also include nodes that corresponds to the decision without the unreachability observations from
// the indirectly connected nodes
downable.intersect(indirectlyConnected.union(additionalNodesToDownWhenIndirectlyConnected))
case ReverseDownIndirectlyConnected =>
// indirectly connected + all reachable
downable.intersect(indirectlyConnected).union(downable.diff(unreachable))
}
}
private def additionalNodesToDownWhenIndirectlyConnected: Set[UniqueAddress] = {
if (unreachableButNotIndirectlyConnected.isEmpty)
Set.empty
else {
val originalUnreachable = _unreachable
val originalReachability = _reachability
try {
val intersectionOfObserversAndSubjects = indirectlyConnectedFromIntersectionOfObserversAndSubjects
val haveSeenCurrentGossip = indirectlyConnectedFromSeenCurrentGossip
// remove records between the indirectly connected
_reachability = reachability.filterRecords(
r =>
!((intersectionOfObserversAndSubjects(r.observer) && intersectionOfObserversAndSubjects(r.subject)) ||
(haveSeenCurrentGossip(r.observer) && haveSeenCurrentGossip(r.subject))))
_unreachable = reachability.allUnreachableOrTerminated
val additionalDecision = decide()
if (additionalDecision.isIndirectlyConnected)
throw new IllegalStateException(
s"SBR double $additionalDecision decision, downing all instead. " +
s"originalReachability: [$originalReachability], filtered reachability [$reachability], " +
s"still indirectlyConnected: [$indirectlyConnected], seenBy: [$seenBy]")
nodesToDown(additionalDecision)
} finally {
_unreachable = originalUnreachable
_reachability = originalReachability
}
}
}
def isAllUnreachableDownOrExiting: Boolean = {
_unreachable.isEmpty ||
unreachableMembers.forall(m => m.status == MemberStatus.Down || m.status == MemberStatus.Exiting)
}
def reverseDecision(decision: Decision): Decision = {
decision match {
case DownUnreachable => DownReachable
case AcquireLeaseAndDownUnreachable(_) => DownReachable
case DownReachable => DownUnreachable
case DownAll => DownAll
case DownIndirectlyConnected => ReverseDownIndirectlyConnected
case AcquireLeaseAndDownIndirectlyConnected(_) => ReverseDownIndirectlyConnected
case ReverseDownIndirectlyConnected => DownIndirectlyConnected
}
}
def decide(): Decision
def lease: Option[Lease] = None
}
/**
* INTERNAL API
*
* Down the unreachable nodes if the number of remaining nodes are greater than or equal to the
* given `quorumSize`. Otherwise down the reachable nodes, i.e. it will shut down that side of the partition.
* In other words, the `quorumSize` defines the minimum number of nodes that the cluster must have to be operational.
* If there are unreachable nodes when starting up the cluster, before reaching this limit,
* the cluster may shutdown itself immediately. This is not an issue if you start all nodes at
* approximately the same time.
*
* Note that you must not add more members to the cluster than `quorumSize * 2 - 1`, because then
* both sides may down each other and thereby form two separate clusters. For example,
* quorum quorumSize configured to 3 in a 6 node cluster may result in a split where each side
* consists of 3 nodes each, i.e. each side thinks it has enough nodes to continue by
* itself. A warning is logged if this recommendation is violated.
*
* If the `role` is defined the decision is based only on members with that `role`.
*
* It is only counting members within the own data center.
*/
@InternalApi private[sbr] final class StaticQuorum(
selfDc: DataCenter,
val quorumSize: Int,
override val role: Option[String])
extends DowningStrategy(selfDc) {
import DowningStrategy._
override def decide(): Decision = {
if (isTooManyMembers)
DownAll
else if (hasIndirectlyConnected)
DownIndirectlyConnected
else if (membersWithRole.size - unreachableMembersWithRole.size >= quorumSize)
DownUnreachable
else
DownReachable
}
def isTooManyMembers: Boolean =
membersWithRole.size > (quorumSize * 2 - 1)
}
/**
* INTERNAL API
*
* Down the unreachable nodes if the current node is in the majority part based the last known
* membership information. Otherwise down the reachable nodes, i.e. the own part. If the the
* parts are of equal size the part containing the node with the lowest address is kept.
*
* If the `role` is defined the decision is based only on members with that `role`.
*
* Note that if there are more than two partitions and none is in majority each part
* will shutdown itself, terminating the whole cluster.
*
* It is only counting members within the own data center.
*/
@InternalApi private[sbr] final class KeepMajority(selfDc: DataCenter, override val role: Option[String])
extends DowningStrategy(selfDc) {
import DowningStrategy._
override def decide(): Decision = {
if (hasIndirectlyConnected)
DownIndirectlyConnected
else {
val ms = membersWithRole
if (ms.isEmpty)
DownAll // no node with matching role
else {
val reachableSize = reachableMembersWithRole.size
val unreachableSize = unreachableMembersWithRole.size
majorityDecision(reachableSize, unreachableSize, ms.head) match {
case DownUnreachable =>
majorityDecisionWhenIncludingMembershipChangesEdgeCase() match {
case DownUnreachable => DownUnreachable // same conclusion
case _ => DownAll // different conclusion, safest to DownAll
}
case decision => decision
}
}
}
}
private def majorityDecision(thisSide: Int, otherSide: Int, lowest: Member): Decision = {
if (thisSide == otherSide) {
// equal size, keep the side with the lowest address (first in members)
if (unreachable(lowest)) DownReachable else DownUnreachable
} else if (thisSide > otherSide) {
// we are in majority
DownUnreachable
} else {
// we are in minority
DownReachable
}
}
/**
* Check for edge case when membership change happens at the same time as partition.
* Count Joining and WeaklyUp on other side since those might be Up on other side.
* Don't count Leaving on this side since those might be Exiting on other side.
* Note that the membership changes we are looking for will only be done when all
* members have seen previous state, i.e. when a member is moved to Up everybody
* has seen it joining.
*/
private def majorityDecisionWhenIncludingMembershipChangesEdgeCase(): Decision = {
// for this side we count as few as could be possible (excluding joining, excluding leaving)
val ms = membersWithRole(includingPossiblyUp = false, excludingPossiblyExiting = true)
if (ms.isEmpty) {
DownAll
} else {
val thisSideReachableSize =
reachableMembersWithRole(includingPossiblyUp = false, excludingPossiblyExiting = true).size
// for other side we count as many as could be possible (including joining, including leaving)
val otherSideUnreachableSize =
unreachableMembersWithRole(includingPossiblyUp = true, excludingPossiblyExiting = false).size
majorityDecision(thisSideReachableSize, otherSideUnreachableSize, ms.head)
}
}
}
/**
* INTERNAL API
*
* Down the part that does not contain the oldest member (current singleton).
*
* There is one exception to this rule if `downIfAlone` is defined to `true`.
* Then, if the oldest node has partitioned from all other nodes the oldest will
* down itself and keep all other nodes running. The strategy will not down the
* single oldest node when it is the only remaining node in the cluster.
*
* Note that if the oldest node crashes the others will remove it from the cluster
* when `downIfAlone` is `true`, otherwise they will down themselves if the
* oldest node crashes, i.e. shutdown the whole cluster together with the oldest node.
*
* If the `role` is defined the decision is based only on members with that `role`,
* i.e. using the oldest member (singleton) within the nodes with that role.
*
* It is only using members within the own data center, i.e. oldest within the
* data center.
*/
@InternalApi private[sbr] final class KeepOldest(
selfDc: DataCenter,
val downIfAlone: Boolean,
override val role: Option[String])
extends DowningStrategy(selfDc) {
import DowningStrategy._
// sort by age, oldest first
override def ordering: Ordering[Member] = Member.ageOrdering
override def decide(): Decision = {
if (hasIndirectlyConnected)
DownIndirectlyConnected
else {
val ms = membersWithRole
if (ms.isEmpty)
DownAll // no node with matching role
else {
val oldest = ms.head
val oldestIsReachable = !unreachable(oldest)
val reachableCount = reachableMembersWithRole.size
val unreachableCount = unreachableMembersWithRole.size
oldestDecision(oldestIsReachable, reachableCount, unreachableCount) match {
case DownUnreachable =>
oldestDecisionWhenIncludingMembershipChangesEdgeCase() match {
case DownUnreachable => DownUnreachable // same conclusion
case _ => DownAll // different conclusion, safest to DownAll
}
case decision => decision
}
}
}
}
private def oldestDecision(oldestIsOnThisSide: Boolean, thisSide: Int, otherSide: Int): Decision = {
if (oldestIsOnThisSide) {
// if there are only 2 nodes in the cluster it is better to keep the oldest, even though it is alone
// E.g. 2 nodes: thisSide=1, otherSide=1 => DownUnreachable, i.e. keep the oldest
// even though it is alone (because the node on the other side is no better)
// E.g. 3 nodes: thisSide=1, otherSide=2 => DownReachable, i.e. shut down the
// oldest because it is alone
if (downIfAlone && thisSide == 1 && otherSide >= 2) DownReachable
else DownUnreachable
} else {
if (downIfAlone && otherSide == 1 && thisSide >= 2) DownUnreachable
else DownReachable
}
}
/**
* Check for edge case when membership change happens at the same time as partition.
* Exclude Leaving on this side because those could be Exiting on other side.
*
* When `downIfAlone` also consider Joining and WeaklyUp since those might be Up on other side,
* and thereby flip the alone test.
*/
private def oldestDecisionWhenIncludingMembershipChangesEdgeCase(): Decision = {
val ms = membersWithRole(includingPossiblyUp = false, excludingPossiblyExiting = true)
if (ms.isEmpty) {
DownAll
} else {
val oldest = ms.head
val oldestIsReachable = !unreachable(oldest)
// Joining and WeaklyUp are only relevant when downIfAlone = true
val includingPossiblyUp = downIfAlone
val reachableCount = reachableMembersWithRole(includingPossiblyUp, excludingPossiblyExiting = true).size
val unreachableCount = unreachableMembersWithRole(includingPossiblyUp, excludingPossiblyExiting = true).size
oldestDecision(oldestIsReachable, reachableCount, unreachableCount)
}
}
}
/**
* INTERNAL API
*
* Down all nodes unconditionally.
*/
@InternalApi private[sbr] final class DownAllNodes(selfDc: DataCenter) extends DowningStrategy(selfDc) {
import DowningStrategy._
override def decide(): Decision =
DownAll
override def role: Option[String] = None
}
/**
* INTERNAL API
*
* Keep the part that can acquire the lease, and down the other part.
*
* Best effort is to keep the side that has most nodes, i.e. the majority side.
* This is achieved by adding a delay before trying to acquire the lease on the
* minority side.
*
* If the `role` is defined the majority/minority is based only on members with that `role`.
* It is only counting members within the own data center.
*/
@InternalApi private[sbr] final class LeaseMajority(
selfDc: DataCenter,
override val role: Option[String],
_lease: Lease,
acquireLeaseDelayForMinority: FiniteDuration)
extends DowningStrategy(selfDc) {
import DowningStrategy._
override val lease: Option[Lease] = Some(_lease)
override def decide(): Decision = {
if (hasIndirectlyConnected)
AcquireLeaseAndDownIndirectlyConnected(Duration.Zero)
else
AcquireLeaseAndDownUnreachable(acquireLeaseDelay)
}
private def acquireLeaseDelay: FiniteDuration =
if (isInMinority) acquireLeaseDelayForMinority else Duration.Zero
private def isInMinority: Boolean = {
val ms = membersWithRole
if (ms.isEmpty)
false // no node with matching role
else {
val unreachableSize = unreachableMembersWithRole.size
val membersSize = ms.size
if (unreachableSize * 2 == membersSize) {
// equal size, try to keep the side with the lowest address (first in members)
unreachable(ms.head)
} else if (unreachableSize * 2 < membersSize) {
// we are in majority
false
} else {
// we are in minority
true
}
}
}
}

View file

@ -0,0 +1,577 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import java.time.Instant
import java.time.temporal.ChronoUnit
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.Props
import akka.actor.Stash
import akka.actor.Timers
import akka.annotation.InternalApi
import akka.cluster.Cluster
import akka.cluster.ClusterEvent
import akka.cluster.ClusterEvent._
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.Member
import akka.cluster.Reachability
import akka.cluster.UniqueAddress
import akka.pattern.pipe
/**
* INTERNAL API
*/
@InternalApi private[sbr] object SplitBrainResolver {
def props(stableAfter: FiniteDuration, strategy: DowningStrategy): Props =
Props(new SplitBrainResolver(stableAfter, strategy))
case object Tick
/**
* Response (result) of the acquire lease request.
*/
final case class AcquireLeaseResult(holdingLease: Boolean)
/**
* Response (result) of the release lease request.
*/
final case class ReleaseLeaseResult(released: Boolean)
/**
* For delayed acquire of the lease.
*/
case object AcquireLease
sealed trait ReleaseLeaseCondition
object ReleaseLeaseCondition {
case object NoLease extends ReleaseLeaseCondition
final case class WhenMembersRemoved(nodes: Set[UniqueAddress]) extends ReleaseLeaseCondition
final case class WhenTimeElapsed(deadline: Deadline) extends ReleaseLeaseCondition
}
final case class ReachabilityChangedStats(
firstChangeTimestamp: Long,
latestChangeTimestamp: Long,
changeCount: Long) {
def isEmpty: Boolean =
changeCount == 0
override def toString: String = {
if (isEmpty)
"reachability unchanged"
else {
val now = System.nanoTime()
s"reachability changed $changeCount times since ${(now - firstChangeTimestamp).nanos.toMillis} ms ago, " +
s"latest change was ${(now - latestChangeTimestamp).nanos.toMillis} ms ago"
}
}
}
}
/**
* INTERNAL API
*
* Unreachable members will be downed by this actor according to the given strategy.
* It is active on the leader node in the cluster.
*
* The implementation is split into two classes SplitBrainResolver and SplitBrainResolverBase to be
* able to unit test the logic without running cluster.
*/
@InternalApi private[sbr] final class SplitBrainResolver(stableAfter: FiniteDuration, strategy: DowningStrategy)
extends SplitBrainResolverBase(stableAfter, strategy) {
private val cluster = Cluster(context.system)
log.info(
"SBR started. Config: stableAfter: {} ms, strategy: {}, selfUniqueAddress: {}, selfDc: {}",
stableAfter.toMillis,
strategy.getClass.getSimpleName,
selfUniqueAddress,
selfDc)
override def selfUniqueAddress: UniqueAddress = cluster.selfUniqueAddress
override def selfDc: DataCenter = cluster.selfDataCenter
// re-subscribe when restart
override def preStart(): Unit = {
cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[ClusterDomainEvent])
super.preStart()
}
override def postStop(): Unit = {
cluster.unsubscribe(self)
super.postStop()
}
override def down(node: Address): Unit = {
cluster.down(node)
}
}
/**
* INTERNAL API
*
* The implementation is split into two classes SplitBrainResolver and SplitBrainResolverBase to be
* able to unit test the logic without running cluster.
*/
@InternalApi private[sbr] abstract class SplitBrainResolverBase(stableAfter: FiniteDuration, strategy: DowningStrategy)
extends Actor
with ActorLogging
with Stash
with Timers {
import DowningStrategy._
import SplitBrainResolver.ReleaseLeaseCondition.NoLease
import SplitBrainResolver._
def selfUniqueAddress: UniqueAddress
def selfDc: DataCenter
def down(node: Address): Unit
// would be better as constructor parameter, but don't want to break Cinnamon instrumentation
private val settings = new SplitBrainResolverSettings(context.system.settings.config)
def downAllWhenUnstable: FiniteDuration =
settings.DownAllWhenUnstable
private val releaseLeaseAfter = stableAfter * 2
def tickInterval: FiniteDuration = 1.second
timers.startTimerWithFixedDelay(Tick, Tick, tickInterval)
var leader = false
var selfMemberAdded = false
private def internalDispatcher: ExecutionContext =
context.system.asInstanceOf[ExtendedActorSystem].dispatchers.internalDispatcher
// overridden in tests
protected def newStableDeadline(): Deadline = Deadline.now + stableAfter
var stableDeadline: Deadline = _
def resetStableDeadline(): Unit = {
stableDeadline = newStableDeadline()
}
resetStableDeadline()
private var reachabilityChangedStats: ReachabilityChangedStats =
ReachabilityChangedStats(System.nanoTime(), System.nanoTime(), 0)
private def resetReachabilityChangedStats(): Unit = {
val now = System.nanoTime()
reachabilityChangedStats = ReachabilityChangedStats(now, now, 0)
}
private def resetReachabilityChangedStatsIfAllUnreachableDowned(): Unit = {
if (!reachabilityChangedStats.isEmpty && strategy.isAllUnreachableDownOrExiting) {
log.debug("SBR resetting reachability stats, after all unreachable healed, downed or removed")
resetReachabilityChangedStats()
}
}
private var releaseLeaseCondition: ReleaseLeaseCondition = NoLease
/** Helper to wrap updates to strategy info with, so that stable-after timer is reset and information is logged about state change */
def mutateMemberInfo(resetStable: Boolean)(f: () => Unit): Unit = {
val unreachableBefore = strategy.unreachable.size
f()
val unreachableAfter = strategy.unreachable.size
def earliestTimeOfDecision: String =
Instant.now().plus(stableAfter.toMillis, ChronoUnit.MILLIS).toString
if (resetStable) {
if (isResponsible) {
if (unreachableBefore == 0 && unreachableAfter > 0) {
log.info(
"SBR found unreachable members, waiting for stable-after = {} ms before taking downing decision. " +
"Now {} unreachable members found. Downing decision will not be made before {}.",
stableAfter.toMillis,
unreachableAfter,
earliestTimeOfDecision)
} else if (unreachableBefore > 0 && unreachableAfter == 0) {
log.info(
"SBR found all unreachable members healed during stable-after period, no downing decision necessary for now.")
} else if (unreachableAfter > 0) {
log.info(
"SBR found unreachable members changed during stable-after period. Resetting timer. " +
"Now {} unreachable members found. Downing decision will not be made before {}.",
unreachableAfter,
earliestTimeOfDecision)
}
// else no unreachable members found but set of members changed
}
log.debug("SBR reset stable deadline when members/unreachable changed")
resetStableDeadline()
}
}
/** Helper to wrap updates to `leader` and `selfMemberAdded` to log changes in responsibility status */
def mutateResponsibilityInfo(f: () => Unit): Unit = {
val responsibleBefore = isResponsible
f()
val responsibleAfter = isResponsible
if (!responsibleBefore && responsibleAfter)
log.info(
"This node is now the leader responsible for taking SBR decisions among the reachable nodes " +
"(more leaders may exist).")
else if (responsibleBefore && !responsibleAfter)
log.info("This node is not the leader any more and not responsible for taking SBR decisions.")
if (leader && !selfMemberAdded)
log.debug("This node is leader but !selfMemberAdded.")
}
private var unreachableDataCenters = Set.empty[DataCenter]
override def postStop(): Unit = {
if (releaseLeaseCondition != NoLease) {
log.info(
"SBR is stopped and owns the lease. The lease will not be released until after the " +
"lease heartbeat-timeout.")
}
super.postStop()
}
def receive: Receive = {
case SeenChanged(_, seenBy) => seenChanged(seenBy)
case MemberJoined(m) => addJoining(m)
case MemberWeaklyUp(m) => addWeaklyUp(m)
case MemberUp(m) => addUp(m)
case MemberLeft(m) => leaving(m)
case UnreachableMember(m) => unreachableMember(m)
case MemberDowned(m) => unreachableMember(m)
case MemberExited(m) => unreachableMember(m)
case ReachableMember(m) => reachableMember(m)
case ReachabilityChanged(r) => reachabilityChanged(r)
case MemberRemoved(m, _) => remove(m)
case UnreachableDataCenter(dc) => unreachableDataCenter(dc)
case ReachableDataCenter(dc) => reachableDataCenter(dc)
case LeaderChanged(leaderOption) => leaderChanged(leaderOption)
case ReleaseLeaseResult(released) => releaseLeaseResult(released)
case Tick => tick()
case _: ClusterDomainEvent => // not interested in other events
}
private def leaderChanged(leaderOption: Option[Address]): Unit = {
mutateResponsibilityInfo { () =>
leader = leaderOption.contains(selfUniqueAddress.address)
}
}
private def tick(): Unit = {
// note the DownAll due to instability is running on all nodes to make that decision as quickly and
// aggressively as possible if time is out
if (reachabilityChangedStats.changeCount > 0) {
val now = System.nanoTime()
val durationSinceLatestChange = (now - reachabilityChangedStats.latestChangeTimestamp).nanos
val durationSinceFirstChange = (now - reachabilityChangedStats.firstChangeTimestamp).nanos
if (durationSinceLatestChange > (stableAfter * 2)) {
log.debug("SBR no reachability changes within {} ms, resetting stats", (stableAfter * 2).toMillis)
resetReachabilityChangedStats()
} else if (downAllWhenUnstable > Duration.Zero &&
durationSinceFirstChange > (stableAfter + downAllWhenUnstable)) {
log.warning("SBR detected instability and will down all nodes: {}", reachabilityChangedStats)
actOnDecision(DownAll)
}
}
if (isResponsible && strategy.unreachable.nonEmpty && stableDeadline.isOverdue()) {
strategy.decide() match {
case decision: AcquireLeaseDecision =>
strategy.lease match {
case Some(lease) =>
if (lease.checkLease()) {
log.info("SBR has acquired lease for decision [{}]", decision)
actOnDecision(decision)
} else {
if (decision.acquireDelay == Duration.Zero)
acquireLease() // reply message is AcquireLeaseResult
else {
log.debug("SBR delayed attempt to acquire lease for [{} ms]", decision.acquireDelay.toMillis)
timers.startSingleTimer(AcquireLease, AcquireLease, decision.acquireDelay)
}
context.become(waitingForLease(decision))
}
case None =>
throw new IllegalStateException("Unexpected lease decision although lease is not configured")
}
case decision =>
actOnDecision(decision)
}
}
releaseLeaseCondition match {
case ReleaseLeaseCondition.WhenTimeElapsed(deadline) =>
if (deadline.isOverdue())
releaseLease() // reply message is ReleaseLeaseResult, which will update the releaseLeaseCondition
case _ =>
// no lease or first waiting for downed nodes to be removed
}
}
private def acquireLease(): Unit = {
log.debug("SBR trying to acquire lease")
implicit val ec: ExecutionContext = internalDispatcher
strategy.lease.foreach(
_.acquire()
.recover {
case t =>
log.error(t, "SBR acquire of lease failed")
false
}
.map(AcquireLeaseResult)
.pipeTo(self))
}
def waitingForLease(decision: Decision): Receive = {
case AcquireLease =>
acquireLease() // reply message is LeaseResult
case AcquireLeaseResult(holdingLease) =>
if (holdingLease) {
log.info("SBR acquired lease for decision [{}]", decision)
val downedNodes = actOnDecision(decision)
releaseLeaseCondition = releaseLeaseCondition match {
case ReleaseLeaseCondition.WhenMembersRemoved(nodes) =>
ReleaseLeaseCondition.WhenMembersRemoved(nodes.union(downedNodes))
case _ =>
if (downedNodes.isEmpty)
ReleaseLeaseCondition.WhenTimeElapsed(Deadline.now + releaseLeaseAfter)
else
ReleaseLeaseCondition.WhenMembersRemoved(downedNodes)
}
} else {
val reverseDecision = strategy.reverseDecision(decision)
log.info("SBR couldn't acquire lease, reverse decision [{}] to [{}]", decision, reverseDecision)
actOnDecision(reverseDecision)
releaseLeaseCondition = NoLease
}
unstashAll()
context.become(receive)
case ReleaseLeaseResult(_) => // superseded by new acquire release request
case Tick => // ignore ticks while waiting
case _ =>
stash()
}
private def releaseLeaseResult(released: Boolean): Unit = {
releaseLeaseCondition match {
case ReleaseLeaseCondition.WhenTimeElapsed(deadline) =>
if (released && deadline.isOverdue())
releaseLeaseCondition = NoLease // released successfully
case _ =>
// no lease or first waiting for downed nodes to be removed
}
}
/**
* @return the nodes that were downed
*/
def actOnDecision(decision: Decision): Set[UniqueAddress] = {
val nodesToDown =
try {
strategy.nodesToDown(decision)
} catch {
case e: IllegalStateException =>
log.warning(e.getMessage)
strategy.nodesToDown(DownAll)
}
val downMyself = nodesToDown.contains(selfUniqueAddress)
val indirectlyConnectedLogMessage =
if (decision.isIndirectlyConnected)
s", indirectly connected [${strategy.indirectlyConnected.mkString(", ")}]"
else ""
val unreachableDataCentersLogMessage =
if (unreachableDataCenters.nonEmpty)
s", unreachable DCs [${unreachableDataCenters.mkString(", ")}]"
else ""
log.warning(
s"SBR took decision $decision and is downing [${nodesToDown.map(_.address).mkString(", ")}]${if (downMyself) " including myself,"
else ""}, " +
s"[${strategy.unreachable.size}] unreachable of [${strategy.members.size}] members" +
indirectlyConnectedLogMessage +
s", all members in DC [${strategy.allMembersInDC.mkString(", ")}], full reachability status: ${strategy.reachability}" +
unreachableDataCentersLogMessage)
if (nodesToDown.nonEmpty) {
// downing is idempotent, and we also avoid calling down on nodes with status Down
// down selfAddress last, since it may shutdown itself if down alone
nodesToDown.foreach(uniqueAddress => if (uniqueAddress != selfUniqueAddress) down(uniqueAddress.address))
if (downMyself)
down(selfUniqueAddress.address)
resetReachabilityChangedStats()
resetStableDeadline()
}
nodesToDown
}
def isResponsible: Boolean = leader && selfMemberAdded
def unreachableMember(m: Member): Unit = {
if (m.uniqueAddress != selfUniqueAddress && m.dataCenter == selfDc) {
log.debug("SBR unreachableMember [{}]", m)
mutateMemberInfo(resetStable = true) { () =>
strategy.addUnreachable(m)
resetReachabilityChangedStatsIfAllUnreachableDowned()
}
}
}
def reachableMember(m: Member): Unit = {
if (m.uniqueAddress != selfUniqueAddress && m.dataCenter == selfDc) {
log.debug("SBR reachableMember [{}]", m)
mutateMemberInfo(resetStable = true) { () =>
strategy.addReachable(m)
resetReachabilityChangedStatsIfAllUnreachableDowned()
}
}
}
private[sbr] def reachabilityChanged(r: Reachability): Unit = {
if (strategy.setReachability(r)) {
// resetStableDeadline is done from unreachableMember/reachableMember
updateReachabilityChangedStats()
// it may also change when members are removed and therefore the reset may be needed
resetReachabilityChangedStatsIfAllUnreachableDowned()
log.debug("SBR noticed {}", reachabilityChangedStats)
}
}
private def updateReachabilityChangedStats(): Unit = {
val now = System.nanoTime()
if (reachabilityChangedStats.changeCount == 0)
reachabilityChangedStats = ReachabilityChangedStats(now, now, 1)
else
reachabilityChangedStats = reachabilityChangedStats.copy(
latestChangeTimestamp = now,
changeCount = reachabilityChangedStats.changeCount + 1)
}
def unreachableDataCenter(dc: DataCenter): Unit = {
unreachableDataCenters += dc
log.warning(
"Data center [{}] observed as unreachable. " +
"Note that nodes in other data center will not be downed by SBR in this data center [{}]",
dc,
selfDc)
}
def reachableDataCenter(dc: DataCenter): Unit = {
unreachableDataCenters -= dc
log.info("Data center [] observed as reachable again", dc)
}
def seenChanged(seenBy: Set[Address]): Unit = {
strategy.setSeenBy(seenBy)
}
def addUp(m: Member): Unit = {
if (selfDc == m.dataCenter) {
log.debug("SBR add Up [{}]", m)
mutateMemberInfo(resetStable = true) { () =>
strategy.add(m)
if (m.uniqueAddress == selfUniqueAddress) mutateResponsibilityInfo { () =>
selfMemberAdded = true
}
}
strategy match {
case s: StaticQuorum =>
if (s.isTooManyMembers)
log.warning(
"The cluster size is [{}] and static-quorum.quorum-size is [{}]. You should not add " +
"more than [{}] (static-quorum.size * 2 - 1) members to the cluster. If the exceeded cluster size " +
"remains when a SBR decision is needed it will down all nodes.",
s.membersWithRole.size,
s.quorumSize,
s.quorumSize * 2 - 1)
case _ => // ok
}
}
}
def leaving(m: Member): Unit = {
if (selfDc == m.dataCenter) {
log.debug("SBR leaving [{}]", m)
mutateMemberInfo(resetStable = false) { () =>
strategy.add(m)
}
}
}
def addJoining(m: Member): Unit = {
if (selfDc == m.dataCenter) {
log.debug("SBR add Joining/WeaklyUp [{}]", m)
strategy.add(m)
}
}
def addWeaklyUp(m: Member): Unit = {
if (m.uniqueAddress == selfUniqueAddress) mutateResponsibilityInfo { () =>
selfMemberAdded = true
}
// treat WeaklyUp in same way as joining
addJoining(m)
}
def remove(m: Member): Unit = {
if (selfDc == m.dataCenter) {
if (m.uniqueAddress == selfUniqueAddress)
context.stop(self)
else
mutateMemberInfo(resetStable = false) { () =>
log.debug("SBR remove [{}]", m)
strategy.remove(m)
resetReachabilityChangedStatsIfAllUnreachableDowned()
releaseLeaseCondition = releaseLeaseCondition match {
case ReleaseLeaseCondition.WhenMembersRemoved(downedNodes) =>
val remainingDownedNodes = downedNodes - m.uniqueAddress
if (remainingDownedNodes.isEmpty)
ReleaseLeaseCondition.WhenTimeElapsed(Deadline.now + releaseLeaseAfter)
else
ReleaseLeaseCondition.WhenMembersRemoved(remainingDownedNodes)
case other =>
// no lease or not holding lease
other
}
}
}
}
private def releaseLease(): Unit = {
implicit val ec: ExecutionContext = internalDispatcher
strategy.lease.foreach { l =>
if (releaseLeaseCondition != NoLease) {
log.info("SBR releasing lease")
l.release().recover { case _ => false }.map(ReleaseLeaseResult.apply).pipeTo(self)
}
}
}
}

View file

@ -0,0 +1,63 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.DowningProvider
import akka.coordination.lease.scaladsl.LeaseProvider
/**
* See reference documentation: https://doc.akka.io/docs/akka/current/split-brain-resolver.html
*
* Enabled with configuration:
* {{{
* akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
* }}}
*/
final class SplitBrainResolverProvider(system: ActorSystem) extends DowningProvider {
private val settings = new SplitBrainResolverSettings(system.settings.config)
override def downRemovalMargin: FiniteDuration = {
// if down-removal-margin is defined we let it trump stable-after to allow
// for two different values for SBR downing and cluster tool stop/start after downing
val drm = Cluster(system).settings.DownRemovalMargin
if (drm != Duration.Zero) drm
else settings.DowningStableAfter
}
override def downingActorProps: Option[Props] = {
import SplitBrainResolverSettings._
val cluster = Cluster(system)
val selfDc = cluster.selfDataCenter
val strategy =
settings.DowningStrategy match {
case KeepMajorityName =>
new KeepMajority(selfDc, settings.keepMajorityRole)
case StaticQuorumName =>
val s = settings.staticQuorumSettings
new StaticQuorum(selfDc, s.size, s.role)
case KeepOldestName =>
val s = settings.keepOldestSettings
new KeepOldest(selfDc, s.downIfAlone, s.role)
case DownAllName =>
new DownAllNodes(selfDc)
case LeaseMajorityName =>
val s = settings.leaseMajoritySettings
val leaseOwnerName = cluster.selfUniqueAddress.address.hostPort
val lease = LeaseProvider(system).getLease(s"${system.name}-akka-sbr", s.leaseImplementation, leaseOwnerName)
new LeaseMajority(selfDc, s.role, lease, s.acquireLeaseDelayForMinority)
}
Some(SplitBrainResolver.props(settings.DowningStableAfter, strategy))
}
}

View file

@ -0,0 +1,128 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import java.util.Locale
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import com.typesafe.config.Config
import akka.ConfigurationException
import akka.annotation.InternalApi
import akka.util.Helpers
import akka.util.Helpers.Requiring
/**
* INTERNAL API
*/
@InternalApi private[sbr] object SplitBrainResolverSettings {
final val KeepMajorityName = "keep-majority"
final val LeaseMajorityName = "lease-majority"
final val StaticQuorumName = "static-quorum"
final val KeepOldestName = "keep-oldest"
final val DownAllName = "down-all"
def allStrategyNames =
Set(KeepMajorityName, LeaseMajorityName, StaticQuorumName, KeepOldestName, DownAllName)
}
/**
* INTERNAL API
*/
@InternalApi private[sbr] final class SplitBrainResolverSettings(config: Config) {
import SplitBrainResolverSettings._
private val cc = config.getConfig("akka.cluster.split-brain-resolver")
val DowningStableAfter: FiniteDuration = {
val key = "stable-after"
FiniteDuration(cc.getDuration(key).toMillis, TimeUnit.MILLISECONDS).requiring(_ >= Duration.Zero, key + " >= 0s")
}
val DowningStrategy: String =
cc.getString("active-strategy").toLowerCase(Locale.ROOT) match {
case strategyName if allStrategyNames(strategyName) => strategyName
case unknown =>
throw new ConfigurationException(
s"Unknown downing strategy [$unknown]. Select one of [${allStrategyNames.mkString(",")}]")
}
val DownAllWhenUnstable: FiniteDuration = {
val key = "down-all-when-unstable"
Helpers.toRootLowerCase(cc.getString("down-all-when-unstable")) match {
case "on" =>
// based on stable-after
DowningStableAfter * 3 / 4
case "off" =>
// disabled
Duration.Zero
case _ =>
FiniteDuration(cc.getDuration(key).toMillis, TimeUnit.MILLISECONDS)
.requiring(_ > Duration.Zero, key + " > 0s, or 'off' to disable")
}
}
// the individual sub-configs below should only be called when the strategy has been selected
def keepMajorityRole: Option[String] = role(strategyConfig(KeepMajorityName))
def staticQuorumSettings: StaticQuorumSettings = {
val c = strategyConfig(StaticQuorumName)
val size = c
.getInt("quorum-size")
.requiring(_ >= 1, s"akka.cluster.split-brain-resolver.$StaticQuorumName.quorum-size must be >= 1")
StaticQuorumSettings(size, role(c))
}
def keepOldestSettings: KeepOldestSettings = {
val c = strategyConfig(KeepOldestName)
val downIfAlone = c.getBoolean("down-if-alone")
KeepOldestSettings(downIfAlone, role(c))
}
def leaseMajoritySettings: LeaseMajoritySettings = {
val c = strategyConfig(LeaseMajorityName)
val leaseImplementation = c.getString("lease-implementation")
require(
leaseImplementation != "",
s"akka.cluster.split-brain-resolver.$LeaseMajorityName.lease-implementation must be defined")
val acquireLeaseDelayForMinority =
FiniteDuration(c.getDuration("acquire-lease-delay-for-minority").toMillis, TimeUnit.MILLISECONDS)
LeaseMajoritySettings(leaseImplementation, acquireLeaseDelayForMinority, role(c))
}
private def strategyConfig(strategyName: String): Config = cc.getConfig(strategyName)
private def role(c: Config): Option[String] = c.getString("role") match {
case "" => None
case r => Some(r)
}
}
/**
* INTERNAL API
*/
@InternalApi private[sbr] final case class StaticQuorumSettings(size: Int, role: Option[String])
/**
* INTERNAL API
*/
@InternalApi private[sbr] final case class KeepOldestSettings(downIfAlone: Boolean, role: Option[String])
/**
* INTERNAL API
*/
@InternalApi private[sbr] final case class LeaseMajoritySettings(
leaseImplementation: String,
acquireLeaseDelayForMinority: FiniteDuration,
role: Option[String])

View file

@ -31,7 +31,11 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
loggers = ["akka.testkit.TestEventListener"]
loglevel = INFO
remote.log-remote-lifecycle-events = off
cluster.failure-detector.monitored-by-nr-of-members = 3
cluster {
failure-detector.monitored-by-nr-of-members = 3
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver.active-strategy = keep-majority
}
}
"""))

View file

@ -0,0 +1,128 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter
object DownAllIndirectlyConnected5NodeSpec extends MultiNodeConfig {
val node1 = role("node1")
val node2 = role("node2")
val node3 = role("node3")
val node4 = role("node4")
val node5 = role("node5")
commonConfig(ConfigFactory.parseString("""
akka {
loglevel = INFO
cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver.active-strategy = keep-majority
split-brain-resolver.stable-after = 6s
run-coordinated-shutdown-when-down = off
}
actor.provider = cluster
test.filter-leeway = 10s
}
"""))
testTransport(on = true)
}
class DownAllIndirectlyConnected5NodeSpecMultiJvmNode1 extends DownAllIndirectlyConnected5NodeSpec
class DownAllIndirectlyConnected5NodeSpecMultiJvmNode2 extends DownAllIndirectlyConnected5NodeSpec
class DownAllIndirectlyConnected5NodeSpecMultiJvmNode3 extends DownAllIndirectlyConnected5NodeSpec
class DownAllIndirectlyConnected5NodeSpecMultiJvmNode4 extends DownAllIndirectlyConnected5NodeSpec
class DownAllIndirectlyConnected5NodeSpecMultiJvmNode5 extends DownAllIndirectlyConnected5NodeSpec
class DownAllIndirectlyConnected5NodeSpec
extends MultiNodeSpec(DownAllIndirectlyConnected5NodeSpec)
with MultiNodeClusterSpec {
import DownAllIndirectlyConnected5NodeSpec._
"A 5-node cluster with keep-one-indirectly-connected = off" should {
"down all when indirectly connected combined with clean partition" in {
val cluster = Cluster(system)
runOn(node1) {
cluster.join(cluster.selfAddress)
}
enterBarrier("node1 joined")
runOn(node2, node3, node4, node5) {
cluster.join(node(node1).address)
}
within(10.seconds) {
awaitAssert {
cluster.state.members.size should ===(5)
cluster.state.members.foreach {
_.status should ===(MemberStatus.Up)
}
}
}
enterBarrier("Cluster formed")
runOn(node1) {
for (x <- List(node1, node2, node3); y <- List(node4, node5)) {
testConductor.blackhole(x, y, ThrottlerTransportAdapter.Direction.Both).await
}
}
enterBarrier("blackholed-clean-partition")
runOn(node1) {
testConductor.blackhole(node2, node3, ThrottlerTransportAdapter.Direction.Both).await
}
enterBarrier("blackholed-indirectly-connected")
within(10.seconds) {
awaitAssert {
runOn(node1) {
cluster.state.unreachable.map(_.address) should ===(Set(node2, node3, node4, node5).map(node(_).address))
}
runOn(node2) {
cluster.state.unreachable.map(_.address) should ===(Set(node3, node4, node5).map(node(_).address))
}
runOn(node3) {
cluster.state.unreachable.map(_.address) should ===(Set(node2, node4, node5).map(node(_).address))
}
runOn(node4, node5) {
cluster.state.unreachable.map(_.address) should ===(Set(node1, node2, node3).map(node(_).address))
}
}
}
enterBarrier("unreachable")
runOn(node1) {
within(15.seconds) {
awaitAssert {
cluster.state.members.map(_.address) should ===(Set(node(node1).address))
cluster.state.members.foreach {
_.status should ===(MemberStatus.Up)
}
}
}
}
runOn(node2, node3, node4, node5) {
// downed
awaitCond(cluster.isTerminated, max = 15.seconds)
}
enterBarrier("done")
}
}
}

View file

@ -0,0 +1,133 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter
object DownAllUnstable5NodeSpec extends MultiNodeConfig {
val node1 = role("node1")
val node2 = role("node2")
val node3 = role("node3")
val node4 = role("node4")
val node5 = role("node5")
commonConfig(ConfigFactory.parseString("""
akka {
loglevel = INFO
cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
failure-detector.acceptable-heartbeat-pause = 3s
split-brain-resolver.active-strategy = keep-majority
split-brain-resolver.stable-after = 10s
split-brain-resolver.down-all-when-unstable = 7s
run-coordinated-shutdown-when-down = off
}
# quicker reconnect
remote.retry-gate-closed-for = 1s
remote.netty.tcp.connection-timeout = 3 s
actor.provider = cluster
test.filter-leeway = 10s
}
"""))
testTransport(on = true)
}
class DownAllUnstable5NodeSpecMultiJvmNode1 extends DownAllUnstable5NodeSpec
class DownAllUnstable5NodeSpecMultiJvmNode2 extends DownAllUnstable5NodeSpec
class DownAllUnstable5NodeSpecMultiJvmNode3 extends DownAllUnstable5NodeSpec
class DownAllUnstable5NodeSpecMultiJvmNode4 extends DownAllUnstable5NodeSpec
class DownAllUnstable5NodeSpecMultiJvmNode5 extends DownAllUnstable5NodeSpec
class DownAllUnstable5NodeSpec extends MultiNodeSpec(DownAllUnstable5NodeSpec) with MultiNodeClusterSpec {
import DownAllUnstable5NodeSpec._
"A 5-node cluster with down-all-when-unstable" should {
"down all when instability continues" in {
val cluster = Cluster(system)
runOn(node1) {
cluster.join(cluster.selfAddress)
}
enterBarrier("node1 joined")
runOn(node2, node3, node4, node5) {
cluster.join(node(node1).address)
}
within(10.seconds) {
awaitAssert {
cluster.state.members.size should ===(5)
cluster.state.members.foreach {
_.status should ===(MemberStatus.Up)
}
}
}
enterBarrier("Cluster formed")
// acceptable-heartbeat-pause = 3s
// stable-after = 10s
// down-all-when-unstable = 7s
runOn(node1) {
for (x <- List(node1, node2, node3); y <- List(node4, node5)) {
testConductor.blackhole(x, y, ThrottlerTransportAdapter.Direction.Both).await
}
}
enterBarrier("blackholed-clean-partition")
within(10.seconds) {
awaitAssert {
runOn(node1, node2, node3) {
cluster.state.unreachable.map(_.address) should ===(Set(node4, node5).map(node(_).address))
}
runOn(node4, node5) {
cluster.state.unreachable.map(_.address) should ===(Set(node1, node2, node3).map(node(_).address))
}
}
}
enterBarrier("unreachable-clean-partition")
// no decision yet
Thread.sleep(2000)
cluster.state.members.size should ===(5)
cluster.state.members.foreach {
_.status should ===(MemberStatus.Up)
}
runOn(node1) {
testConductor.blackhole(node2, node3, ThrottlerTransportAdapter.Direction.Both).await
}
enterBarrier("blackhole-2")
// then it takes about 5 seconds for failure detector to observe that
Thread.sleep(7000)
runOn(node1) {
testConductor.passThrough(node2, node3, ThrottlerTransportAdapter.Direction.Both).await
}
enterBarrier("passThrough-2")
// now it should have been unstable for more than 17 seconds
// all downed
awaitCond(cluster.isTerminated, max = 15.seconds)
enterBarrier("done")
}
}
}

View file

@ -0,0 +1,111 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter
object IndirectlyConnected3NodeSpec extends MultiNodeConfig {
val node1 = role("node1")
val node2 = role("node2")
val node3 = role("node3")
commonConfig(ConfigFactory.parseString("""
akka {
loglevel = INFO
cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver.active-strategy = keep-majority
split-brain-resolver.stable-after = 6s
run-coordinated-shutdown-when-down = off
}
actor.provider = cluster
test.filter-leeway = 10s
}
"""))
testTransport(on = true)
}
class IndirectlyConnected3NodeSpecMultiJvmNode1 extends IndirectlyConnected3NodeSpec
class IndirectlyConnected3NodeSpecMultiJvmNode2 extends IndirectlyConnected3NodeSpec
class IndirectlyConnected3NodeSpecMultiJvmNode3 extends IndirectlyConnected3NodeSpec
class IndirectlyConnected3NodeSpec extends MultiNodeSpec(IndirectlyConnected3NodeSpec) with MultiNodeClusterSpec {
import IndirectlyConnected3NodeSpec._
"A 3-node cluster" should {
"avoid a split brain when two unreachable but can talk via third" in {
val cluster = Cluster(system)
runOn(node1) {
cluster.join(cluster.selfAddress)
}
enterBarrier("node1 joined")
runOn(node2, node3) {
cluster.join(node(node1).address)
}
within(10.seconds) {
awaitAssert {
cluster.state.members.size should ===(3)
cluster.state.members.foreach {
_.status should ===(MemberStatus.Up)
}
}
}
enterBarrier("Cluster formed")
runOn(node1) {
testConductor.blackhole(node2, node3, ThrottlerTransportAdapter.Direction.Both).await
}
enterBarrier("Blackholed")
within(10.seconds) {
awaitAssert {
runOn(node3) {
cluster.state.unreachable.map(_.address) should ===(Set(node(node2).address))
}
runOn(node2) {
cluster.state.unreachable.map(_.address) should ===(Set(node(node3).address))
}
runOn(node1) {
cluster.state.unreachable.map(_.address) should ===(Set(node(node3).address, node(node2).address))
}
}
}
enterBarrier("unreachable")
runOn(node1) {
within(15.seconds) {
awaitAssert {
cluster.state.members.map(_.address) should ===(Set(node(node1).address))
cluster.state.members.foreach {
_.status should ===(MemberStatus.Up)
}
}
}
}
runOn(node2, node3) {
// downed
awaitCond(cluster.isTerminated, max = 15.seconds)
}
enterBarrier("done")
}
}
}

View file

@ -0,0 +1,125 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter
object IndirectlyConnected5NodeSpec extends MultiNodeConfig {
val node1 = role("node1")
val node2 = role("node2")
val node3 = role("node3")
val node4 = role("node4")
val node5 = role("node5")
commonConfig(ConfigFactory.parseString("""
akka {
loglevel = INFO
cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver.active-strategy = keep-majority
split-brain-resolver.stable-after = 6s
run-coordinated-shutdown-when-down = off
}
actor.provider = cluster
test.filter-leeway = 10s
}
"""))
testTransport(on = true)
}
class IndirectlyConnected5NodeSpecMultiJvmNode1 extends IndirectlyConnected5NodeSpec
class IndirectlyConnected5NodeSpecMultiJvmNode2 extends IndirectlyConnected5NodeSpec
class IndirectlyConnected5NodeSpecMultiJvmNode3 extends IndirectlyConnected5NodeSpec
class IndirectlyConnected5NodeSpecMultiJvmNode4 extends IndirectlyConnected5NodeSpec
class IndirectlyConnected5NodeSpecMultiJvmNode5 extends IndirectlyConnected5NodeSpec
class IndirectlyConnected5NodeSpec extends MultiNodeSpec(IndirectlyConnected5NodeSpec) with MultiNodeClusterSpec {
import IndirectlyConnected5NodeSpec._
"A 5-node cluster" should {
"avoid a split brain when indirectly connected combined with clean partition" in {
val cluster = Cluster(system)
runOn(node1) {
cluster.join(cluster.selfAddress)
}
enterBarrier("node1 joined")
runOn(node2, node3, node4, node5) {
cluster.join(node(node1).address)
}
within(10.seconds) {
awaitAssert {
cluster.state.members.size should ===(5)
cluster.state.members.foreach {
_.status should ===(MemberStatus.Up)
}
}
}
enterBarrier("Cluster formed")
runOn(node1) {
for (x <- List(node1, node2, node3); y <- List(node4, node5)) {
testConductor.blackhole(x, y, ThrottlerTransportAdapter.Direction.Both).await
}
}
enterBarrier("blackholed-clean-partition")
runOn(node1) {
testConductor.blackhole(node2, node3, ThrottlerTransportAdapter.Direction.Both).await
}
enterBarrier("blackholed-indirectly-connected")
within(10.seconds) {
awaitAssert {
runOn(node1) {
cluster.state.unreachable.map(_.address) should ===(Set(node2, node3, node4, node5).map(node(_).address))
}
runOn(node2) {
cluster.state.unreachable.map(_.address) should ===(Set(node3, node4, node5).map(node(_).address))
}
runOn(node3) {
cluster.state.unreachable.map(_.address) should ===(Set(node2, node4, node5).map(node(_).address))
}
runOn(node4, node5) {
cluster.state.unreachable.map(_.address) should ===(Set(node1, node2, node3).map(node(_).address))
}
}
}
enterBarrier("unreachable")
runOn(node1) {
within(15.seconds) {
awaitAssert {
cluster.state.members.map(_.address) should ===(Set(node(node1).address))
cluster.state.members.foreach {
_.status should ===(MemberStatus.Up)
}
}
}
}
runOn(node2, node3, node4, node5) {
// downed
awaitCond(cluster.isTerminated, max = 15.seconds)
}
enterBarrier("done")
}
}
}

View file

@ -0,0 +1,189 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.cluster.MemberStatus
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter
import com.typesafe.config.ConfigFactory
import akka.cluster.MultiNodeClusterSpec
import akka.coordination.lease.TestLease
import akka.coordination.lease.TestLeaseExt
object LeaseMajority5NodeSpec extends MultiNodeConfig {
val node1 = role("node1")
val node2 = role("node2")
val node3 = role("node3")
val node4 = role("node4")
val node5 = role("node5")
commonConfig(ConfigFactory.parseString(s"""
akka {
loglevel = INFO
cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver {
active-strategy = lease-majority
stable-after = 6s
lease-majority {
lease-implementation = test-lease
acquire-lease-delay-for-minority = 1s
}
}
run-coordinated-shutdown-when-down = off
}
actor.provider = cluster
test.filter-leeway = 10s
}
test-lease {
lease-class = ${classOf[TestLease].getName}
heartbeat-interval = 1s
heartbeat-timeout = 120s
lease-operation-timeout = 3s
}
"""))
testTransport(on = true)
}
class LeaseMajority5NodeSpecMultiJvmNode1 extends LeaseMajority5NodeSpec
class LeaseMajority5NodeSpecMultiJvmNode2 extends LeaseMajority5NodeSpec
class LeaseMajority5NodeSpecMultiJvmNode3 extends LeaseMajority5NodeSpec
class LeaseMajority5NodeSpecMultiJvmNode4 extends LeaseMajority5NodeSpec
class LeaseMajority5NodeSpecMultiJvmNode5 extends LeaseMajority5NodeSpec
class LeaseMajority5NodeSpec extends MultiNodeSpec(LeaseMajority5NodeSpec) with MultiNodeClusterSpec {
import LeaseMajority5NodeSpec._
private val testLeaseName = "LeaseMajority5NodeSpec-akka-sbr"
def sortByAddress(roles: RoleName*): List[RoleName] = {
/**
* Sort the roles in the address order used by the cluster node ring.
*/
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
import akka.cluster.Member.addressOrdering
def compare(x: RoleName, y: RoleName): Int = addressOrdering.compare(node(x).address, node(y).address)
}
roles.toList.sorted
}
def leader(roles: RoleName*): RoleName =
sortByAddress(roles: _*).head
"LeaseMajority in a 5-node cluster" should {
"setup cluster" in {
runOn(node1) {
cluster.join(cluster.selfAddress)
}
enterBarrier("node1 joined")
runOn(node2, node3, node4, node5) {
cluster.join(node(node1).address)
}
within(10.seconds) {
awaitAssert {
cluster.state.members.size should ===(5)
cluster.state.members.foreach {
_.status should ===(MemberStatus.Up)
}
}
}
enterBarrier("Cluster formed")
}
"keep the side that can acquire the lease" in {
val lease = TestLeaseExt(system).getTestLease(testLeaseName)
val leaseProbe = lease.probe
runOn(node1, node2, node3) {
lease.setNextAcquireResult(Future.successful(true))
}
runOn(node4, node5) {
lease.setNextAcquireResult(Future.successful(false))
}
enterBarrier("lease-in-place")
runOn(node1) {
for (x <- List(node1, node2, node3); y <- List(node4, node5)) {
testConductor.blackhole(x, y, ThrottlerTransportAdapter.Direction.Both).await
}
}
enterBarrier("blackholed-clean-partition")
runOn(node1, node2, node3) {
within(20.seconds) {
awaitAssert {
cluster.state.members.size should ===(3)
}
}
runOn(leader(node1, node2, node3)) {
leaseProbe.expectMsgType[TestLease.AcquireReq]
// after 2 * stable-after
leaseProbe.expectMsgType[TestLease.ReleaseReq](14.seconds)
}
}
runOn(node4, node5) {
within(20.seconds) {
awaitAssert {
cluster.isTerminated should ===(true)
}
runOn(leader(node4, node5)) {
leaseProbe.expectMsgType[TestLease.AcquireReq]
}
}
}
enterBarrier("downed-and-removed")
leaseProbe.expectNoMessage(1.second)
enterBarrier("done-1")
}
}
"keep the side that can acquire the lease, round 2" in {
val lease = TestLeaseExt(system).getTestLease(testLeaseName)
runOn(node1) {
lease.setNextAcquireResult(Future.successful(true))
}
runOn(node2, node3) {
lease.setNextAcquireResult(Future.successful(false))
}
enterBarrier("lease-in-place-2")
runOn(node1) {
for (x <- List(node1); y <- List(node2, node3)) {
testConductor.blackhole(x, y, ThrottlerTransportAdapter.Direction.Both).await
}
}
enterBarrier("blackholed-clean-partition-2")
runOn(node1) {
within(20.seconds) {
awaitAssert {
cluster.state.members.size should ===(1)
}
}
}
runOn(node2, node3) {
within(20.seconds) {
awaitAssert {
cluster.isTerminated should ===(true)
}
}
}
enterBarrier("done-2")
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sbr
import akka.actor.Address
import akka.cluster.ClusterSettings
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.cluster.MemberStatus.Up
import akka.cluster.MemberStatus.WeaklyUp
import akka.cluster.UniqueAddress
/**
* Needed since the Member constructor is akka private
*/
object TestAddresses {
private def dcRole(dc: ClusterSettings.DataCenter): String =
ClusterSettings.DcRolePrefix + dc
val defaultDataCenter = ClusterSettings.DefaultDataCenter
private def defaultDcRole = dcRole(defaultDataCenter)
val addressA = Address("akka.tcp", "sys", "a", 2552)
val memberA = new Member(UniqueAddress(addressA, 0L), 5, Up, Set("role3", defaultDcRole))
val memberB =
new Member(UniqueAddress(addressA.copy(host = Some("b")), 0L), 4, Up, Set("role1", "role3", defaultDcRole))
val memberC = new Member(UniqueAddress(addressA.copy(host = Some("c")), 0L), 3, Up, Set("role2", defaultDcRole))
val memberD =
new Member(UniqueAddress(addressA.copy(host = Some("d")), 0L), 2, Up, Set("role1", "role2", "role3", defaultDcRole))
val memberE = new Member(UniqueAddress(addressA.copy(host = Some("e")), 0L), 1, Up, Set(defaultDcRole))
val memberF = new Member(UniqueAddress(addressA.copy(host = Some("f")), 0L), 5, Up, Set(defaultDcRole))
val memberG = new Member(UniqueAddress(addressA.copy(host = Some("g")), 0L), 6, Up, Set(defaultDcRole))
val memberAWeaklyUp = new Member(memberA.uniqueAddress, Int.MaxValue, WeaklyUp, memberA.roles)
val memberBWeaklyUp = new Member(memberB.uniqueAddress, Int.MaxValue, WeaklyUp, memberB.roles)
def dcMember(dc: ClusterSettings.DataCenter, m: Member): Member =
new Member(
m.uniqueAddress,
m.upNumber,
m.status,
m.roles.filterNot(_.startsWith(ClusterSettings.DcRolePrefix)) + dcRole(dc))
def dataCenter(dc: ClusterSettings.DataCenter, members: Member*): Set[Member] =
members.toSet[Member].map(m => dcMember(dc, m))
def joining(m: Member): Member = Member(m.uniqueAddress, m.roles)
def leaving(m: Member): Member = m.copy(MemberStatus.Leaving)
def exiting(m: Member): Member = leaving(m).copy(MemberStatus.Exiting)
def downed(m: Member): Member = m.copy(MemberStatus.Down)
}

View file

@ -2,18 +2,22 @@
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
package akka.coordination.lease
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ Future, Promise }
import scala.concurrent.Future
import scala.concurrent.Promise
import com.typesafe.config.ConfigFactory
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider
import akka.coordination.lease.LeaseSettings
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.coordination.lease.scaladsl.Lease
import akka.event.Logging
import akka.testkit.TestProbe
@ -47,9 +51,9 @@ object TestLease {
final case class AcquireReq(owner: String)
final case class ReleaseReq(owner: String)
val config = ConfigFactory.parseString("""
val config = ConfigFactory.parseString(s"""
test-lease {
lease-class = akka.cluster.TestLease
lease-class = ${classOf[TestLease].getName}
}
""".stripMargin)
}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
package akka.coordination.lease
import java.util.concurrent.atomic.AtomicReference
@ -19,8 +19,6 @@ import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.Props
import akka.cluster.TestLeaseActor.{ Acquire, Create, Release }
import akka.coordination.lease.LeaseSettings
import akka.coordination.lease.scaladsl.Lease
import akka.event.Logging
import akka.pattern.ask
@ -96,6 +94,9 @@ class TestLeaseActorClientExt(val system: ExtendedActorSystem) extends Extension
}
class TestLeaseActorClient(settings: LeaseSettings, system: ExtendedActorSystem) extends Lease(settings) {
import TestLeaseActor.Acquire
import TestLeaseActor.Create
import TestLeaseActor.Release
private val log = Logging(system, getClass)
val leaseActor = TestLeaseActorClientExt(system).getLeaseActor()

View file

@ -67,7 +67,7 @@ Environments such as Kubernetes send a SIGTERM, however if the JVM is wrapped wi
In case of network failures it may still be necessary to set the node's status to Down in order to complete the removal.
@ref:[Cluster Downing](../typed/cluster.md#downing) details downing nodes and downing providers.
[Split Brain Resolver](https://doc.akka.io/docs/akka-enhancements/current/split-brain-resolver.html) can be used to ensure
@ref:[Split Brain Resolver](../split-brain-resolver.md) can be used to ensure
the cluster continues to function during network partitions and node failures. For example
if there is an unreachability problem Split Brain Resolver would make a decision based on the configured downing strategy.

View file

@ -26,6 +26,7 @@ An Akka Persistence journal and snapshot store backed by Couchbase.
* [Akka Cluster Bootstrap](https://doc.akka.io/docs/akka-management/current/bootstrap/) helps bootstrapping an Akka cluster using Akka Discovery.
* [Akka Management Cluster HTTP](https://doc.akka.io/docs/akka-management/current/cluster-http-management.html) provides HTTP endpoints for introspecting and managing Akka clusters.
* [Akka Discovery for Kubernetes, Consul, Marathon, and AWS](https://doc.akka.io/docs/akka-management/current/discovery/)
* [Kubernetes Lease](https://doc.akka.io/docs/akka-management/current/kubernetes-lease.html)
## [Akka gRPC](https://doc.akka.io/docs/akka-grpc/current/)
@ -33,8 +34,6 @@ Akka gRPC provides support for building streaming gRPC servers and clients on to
## Akka Resilience Enhancements
* [Akka Split Brain Resolver](https://doc.akka.io/docs/akka-enhancements/current/split-brain-resolver.html)
* [Kubernetes Lease](https://doc.akka.io/docs/akka-enhancements/current/kubernetes-lease.html)
* [Akka Thread Starvation Detector](https://doc.akka.io/docs/akka-enhancements/current/starvation-detector.html)
* [Akka Configuration Checker](https://doc.akka.io/docs/akka-enhancements/current/config-checker.html)
* [Akka Diagnostics Recorder](https://doc.akka.io/docs/akka-enhancements/current/diagnostics-recorder.html)
@ -44,7 +43,6 @@ Akka gRPC provides support for building streaming gRPC servers and clients on to
* [Akka Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html)
* [Akka GDPR for Persistence](https://doc.akka.io/docs/akka-enhancements/current/gdpr/index.html)
## Community Projects
Akka has a vibrant and passionate user community, the members of which have created many independent projects using Akka as well as extensions to it. See [Community Projects](https://akka.io/community/).

View file

@ -35,10 +35,10 @@ Any lease implementation should provide the following guarantees:
To acquire a lease:
Scala
: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-usage }
: @@snip [LeaseDocSpec.scala](/akka-docs/src/test/scala/docs/coordination/LeaseDocSpec.scala) { #lease-usage }
Java
: @@snip [LeaseDocTest.java](/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java) { #lease-usage }
: @@snip [LeaseDocTest.java](/akka-docs/src/test/java/jdocs/coordination/LeaseDocTest.java) { #lease-usage }
Acquiring a lease returns a @scala[Future]@java[CompletionStage] as lease implementations typically are implemented
via a third party system such as the Kubernetes API server or Zookeeper.
@ -53,10 +53,10 @@ It is important to pick a lease name that will be unique for your use case. If a
in a Cluster the cluster host port can be use:
Scala
: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #cluster-owner }
: @@snip [LeaseDocSpec.scala](/akka-docs/src/test/scala/docs/coordination/LeaseDocSpec.scala) { #cluster-owner }
Java
: @@snip [LeaseDocTest.scala](/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java) { #cluster-owner }
: @@snip [LeaseDocTest.scala](/akka-docs/src/test/java/jdocs/coordination/LeaseDocTest.java) { #cluster-owner }
For use cases where multiple different leases on the same node then something unique must be added to the name. For example
a lease can be used with Cluster Sharding and in this case the shard Id is included in the lease name for each shard.
@ -77,7 +77,7 @@ Leases can be used for @ref[Cluster Singletons](cluster-singleton.md#lease) and
## Lease implementations
* [Kubernetes API](https://doc.akka.io/docs/akka-enhancements/current/kubernetes-lease.html)
* [Kubernetes API](https://doc.akka.io/docs/akka-management/current/kubernetes-lease.html)
## Implementing a lease
@ -85,10 +85,10 @@ Implementations should extend
the @scala[`akka.coordination.lease.scaladsl.Lease`]@java[`akka.coordination.lease.javadsl.Lease`]
Scala
: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-example }
: @@snip [LeaseDocSpec.scala](/akka-docs/src/test/scala/docs/coordination/LeaseDocSpec.scala) { #lease-example }
Java
: @@snip [LeaseDocTest.scala](/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java) { #lease-example }
: @@snip [LeaseDocTest.java](/akka-docs/src/test/java/jdocs/coordination/LeaseDocTest.java) { #lease-example }
The methods should provide the following guarantees:
@ -109,10 +109,10 @@ The lease implementation should have support for the following properties where
This configuration location is passed into `getLease`.
Scala
: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-config }
: @@snip [LeaseDocSpec.scala](/akka-docs/src/test/scala/docs/coordination/LeaseDocSpec.scala) { #lease-config }
Java
: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-config }
: @@snip [LeaseDocSpec.scala](/akka-docs/src/test/scala/docs/coordination/LeaseDocSpec.scala) { #lease-config }

View file

@ -0,0 +1,481 @@
# Split Brain Resolver
When operating an Akka cluster you must consider how to handle
[network partitions](http://en.wikipedia.org/wiki/Network_partition) (a.k.a. split brain scenarios)
and machine crashes (including JVM and hardware failures). This is crucial for correct behavior if
you use @ref:[Cluster Singleton](typed/cluster-singleton.md) or @ref:[Cluster Sharding](typed/cluster-sharding.md),
especially together with Akka Persistence.
## Module info
To use Akka Split Brain Resolver is part of `akka-cluster` and you probably already have that
dependency included. Otherwise, add the following dependency in your project:
@@dependency[sbt,Maven,Gradle] {
group=com.typesafe.akka
artifact=akka-cluster_$scala.binary.version$
version=$akka.version$
}
@@project-info{ projectId="akka-cluster" }
## Enable the Split Brain Resolver
You need to enable the Split Brain Resolver by configuring it as downing provider in the configuration of
the `ActorSystem` (`application.conf`):
```
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
```
You should also consider the different available @ref:[downing strategies](#strategies).
## The Problem
A fundamental problem in distributed systems is that network partitions (split brain scenarios) and
machine crashes are indistinguishable for the observer, i.e. a node can observe that there is a problem
with another node, but it cannot tell if it has crashed and will never be available again or if there is
a network issue that might or might not heal again after a while. Temporary and permanent failures are
indistinguishable because decisions must be made in finite time, and there always exists a temporary
failure that lasts longer than the time limit for the decision.
A third type of problem is if a process is unresponsive, e.g. because of overload, CPU starvation or
long garbage collection pauses. This is also indistinguishable from network partitions and crashes.
The only signal we have for decision is "no reply in given time for heartbeats" and this means that
phenomena causing delays or lost heartbeats are indistinguishable from each other and must be
handled in the same way.
When there is a crash, we would like to remove the affected node immediately from the cluster membership.
When there is a network partition or unresponsive process we would like to wait for a while in the hope
that it is a transient problem that will heal again, but at some point, we must give up and continue with
the nodes on one side of the partition and shut down nodes on the other side. Also, certain features are
not fully available during partitions so it might not matter that the partition is transient or not if
it just takes too long. Those two goals are in conflict with each other and there is a trade-off
between how quickly we can remove a crashed node and premature action on transient network partitions.
This is a difficult problem to solve given that the nodes on the different sides of the network partition
cannot communicate with each other. We must ensure that both sides can make this decision by themselves and
that they take the same decision about which part will keep running and which part will shut itself down.
Another type of problem that makes it difficult to see the "right" picture is when some nodes are not fully
connected and cannot communicate directly to each other but information can be disseminated between them via
other nodes.
The Akka cluster has a failure detector that will notice network partitions and machine crashes (but it
cannot distinguish the two). It uses periodic heartbeat messages to check if other nodes are available
and healthy. These observations by the failure detector are referred to as a node being *unreachable*
and it may become *reachable* again if the failure detector observes that it can communicate with it again.
The failure detector in itself is not enough for making the right decision in all situations.
The naive approach is to remove an unreachable node from the cluster membership after a timeout.
This works great for crashes and short transient network partitions, but not for long network
partitions. Both sides of the network partition will see the other side as unreachable and
after a while remove it from its cluster membership. Since this happens on both sides the result
is that two separate disconnected clusters have been created.
This approach is provided by the opt-in (off by default) auto-down feature in the OSS version of
Akka Cluster.
If you use the timeout based auto-down feature in combination with Cluster Singleton or Cluster Sharding
that would mean that two singleton instances or two sharded entities with the same identifier would be running.
One would be running: one in each cluster.
For example when used together with Akka Persistence that could result in that two instances of a
persistent actor with the same `persistenceId` are running and writing concurrently to the
same stream of persistent events, which will have fatal consequences when replaying these events.
The default setting in Akka Cluster is to not remove unreachable nodes automatically and
the recommendation is that the decision of what to
do should be taken by a human operator or an external monitoring system. This is a valid solution,
but not very convenient if you do not have this staff or external system for other reasons.
If the unreachable nodes are not downed at all they will still be part of the cluster membership.
Meaning that Cluster Singleton and Cluster Sharding will not failover to another node. While there
are unreachable nodes new nodes that are joining the cluster will not be promoted to full worthy
members (with status Up). Similarly, leaving members will not be removed until all unreachable
nodes have been resolved. In other words, keeping unreachable members for an unbounded time is
undesirable.
With that introduction of the problem domain, it is time to look at the provided strategies for
handling network partition, unresponsive nodes and crashed nodes.
## Strategies
By default the @ref:[Keep Majority](#keep-majority) strategy will be used because it works well for
most systems. However, it's wort considering the other available strategies and pick a strategy that fits
the characteristics of your system. For example, in a Kubernetes environment the @ref:[Lease](#lease) strategy
can be a good choice.
Every strategy has a failure scenario where it makes a "wrong" decision. This section describes the different
strategies and guidelines of when to use what.
When there is uncertainty it selects to down more nodes than necessary, or even downing of all nodes.
Therefore Split Brain Resolver should always be combined with a mechanism to automatically start up nodes that
have been shutdown, and join them to the existing cluster or form a new cluster again.
You enable a strategy with the configuration property `akka.cluster.split-brain-resolver.active-strategy`.
### Stable after
All strategies are inactive until the cluster membership and the information about unreachable nodes
have been stable for a certain time period. Continuously adding more nodes while there is a network
partition does not influence this timeout, since the status of those nodes will not be changed to Up
while there are unreachable nodes. Joining nodes are not counted in the logic of the strategies.
@@snip [reference.conf](/akka-cluster/src/main/resources/reference.conf) { #split-brain-resolver }
Set `akka.cluster.split-brain-resolver.stable-after` to a shorter duration to have quicker removal of crashed nodes,
at the price of risking too early action on transient network partitions that otherwise would have healed. Do not
set this to a shorter duration than the membership dissemination time in the cluster, which depends
on the cluster size. Recommended minimum duration for different cluster sizes:
|cluster size | stable-after|
|-------------|-------------|
|5 | 7 s |
|10 | 10 s|
|20 | 13 s|
|50 | 17 s|
|100 | 20 s|
|1000 | 30 s|
The different strategies may have additional settings that are described below.
@@@ note
It is important that you use the same configuration on all nodes.
@@@
The side of the split that decides to shut itself down will use the cluster *down* command
to initiate the removal of a cluster member. When that has been spread among the reachable nodes
it will be removed from the cluster membership.
It's good to terminate the `ActorSystem` and exit the JVM when the node is removed from the cluster.
That is handled by @ref:[Coordinated Shutdown](coordinated-shutdown.md)
but to exit the JVM it's recommended that you enable:
```
akka.coordinated-shutdown.exit-jvm = on
```
@@@ note
Some legacy containers may block calls to System.exit(..) and you may have to find an alternate
way to shut the app down. For example, when running Akka on top of a Spring / Tomcat setup, you
could replace the call to `System.exit(..)` with a call to Spring's ApplicationContext .close() method
(or with a HTTP call to Tomcat Manager's API to un-deploy the app).
@@@
### Keep Majority
The strategy named `keep-majority` will down the unreachable nodes if the current node is in
the majority part based on the last known membership information. Otherwise down the reachable nodes,
i.e. the own part. If the parts are of equal size the part containing the node with the lowest
address is kept.
This strategy is a good choice when the number of nodes in the cluster change dynamically and you can
therefore not use `static-quorum`.
This strategy also handles the edge case that may occur when there are membership changes at the same
time as the network partition occurs. For example, the status of two members are changed to `Up`
on one side but that information is not disseminated to the other side before the connection is broken.
Then one side sees two more nodes and both sides might consider themselves having a majority. It will
detect this situation and make the safe decision to down all nodes on the side that could be in minority
if the joining nodes were changed to `Up` on the other side. Note that this has the drawback that
if the joining nodes were not changed to `Up` and becoming a majority on the other side then each part
will shut down itself, terminating the whole cluster.
Note that if there are more than two partitions and none is in majority each part will shut down
itself, terminating the whole cluster.
If more than half of the nodes crash at the same time the other running nodes will down themselves
because they think that they are not in majority, and thereby the whole cluster is terminated.
The decision can be based on nodes with a configured `role` instead of all nodes in the cluster.
This can be useful when some types of nodes are more valuable than others. You might for example
have some nodes responsible for persistent data and some nodes with stateless worker services.
Then it probably more important to keep as many persistent data nodes as possible even though
it means shutting down more worker nodes.
Configuration:
```
akka.cluster.split-brain-resolver.active-strategy=keep-majority
```
@@snip [reference.conf](/akka-cluster/src/main/resources/reference.conf) { #keep-majority }
### Static Quorum
The strategy named `static-quorum` will down the unreachable nodes if the number of remaining
nodes are greater than or equal to a configured `quorum-size`. Otherwise, it will down the reachable nodes,
i.e. it will shut down that side of the partition. In other words, the `quorum-size` defines the minimum
number of nodes that the cluster must have to be operational.
This strategy is a good choice when you have a fixed number of nodes in the cluster, or when you can
define a fixed number of nodes with a certain role.
For example, in a 9 node cluster you will configure the `quorum-size` to 5. If there is a network split
of 4 and 5 nodes the side with 5 nodes will survive and the other 4 nodes will be downed. After that,
in the 5 node cluster, no more failures can be handled, because the remaining cluster size would be
less than 5. In the case of another failure in that 5 node cluster all nodes will be downed.
Therefore it is important that you join new nodes when old nodes have been removed.
Another consequence of this is that if there are unreachable nodes when starting up the cluster,
before reaching this limit, the cluster may shut itself down immediately. This is not an issue
if you start all nodes at approximately the same time or use the `akka.cluster.min-nr-of-members`
to define required number of members before the leader changes member status of 'Joining' members to 'Up'
You can tune the timeout after which downing decisions are made using the `stable-after` setting.
You should not add more members to the cluster than **quorum-size * 2 - 1**. A warning is logged
if this recommendation is violated. If the exceeded cluster size remains when a SBR decision is
needed it will down all nodes because otherwise there is a risk that both sides may down each
other and thereby form two separate clusters.
For rolling updates it's best to leave the cluster gracefully via
@ref:[Coordinated Shutdown](coordinated-shutdown.md) (SIGTERM).
For successful leaving SBR will not be used (no downing) but if there is an unreachability problem
at the same time as the rolling update is in progress there could be an SBR decision. To avoid that
the total number of members limit is not exceeded during the rolling update it's recommended to
leave and fully remove one node before adding a new one, when using `static-quorum`.
If the cluster is split into 3 (or more) parts each part that is smaller than then configured `quorum-size`
will down itself and possibly shutdown the whole cluster.
If more nodes than the configured `quorum-size` crash at the same time the other running nodes
will down themselves because they think that they are not in the majority, and thereby the whole
cluster is terminated.
The decision can be based on nodes with a configured `role` instead of all nodes in the cluster.
This can be useful when some types of nodes are more valuable than others. You might, for example,
have some nodes responsible for persistent data and some nodes with stateless worker services.
Then it probably more important to keep as many persistent data nodes as possible even though
it means shutting down more worker nodes.
There is another use of the `role` as well. By defining a `role` for a few (e.g. 7) stable
nodes in the cluster and using that in the configuration of `static-quorum` you will be able
to dynamically add and remove other nodes without this role and still have good decisions of what
nodes to keep running and what nodes to shut down in the case of network partitions. The advantage
of this approach compared to `keep-majority` (described below) is that you *do not* risk splitting
the cluster into two separate clusters, i.e. *a split brain**. You must still obey the rule of not
starting too many nodes with this `role` as described above. It also suffers the risk of shutting
down all nodes if there is a failure when there are not enough nodes with this `role` remaining
in the cluster, as described above.
Configuration:
```
akka.cluster.split-brain-resolver.active-strategy=static-quorum
```
@@snip [reference.conf](/akka-cluster/src/main/resources/reference.conf) { #static-quorum }
### Keep Oldest
The strategy named `keep-oldest` will down the part that does not contain the oldest
member. The oldest member is interesting because the active Cluster Singleton instance
is running on the oldest member.
There is one exception to this rule if `down-if-alone` is configured to `on`.
Then, if the oldest node has partitioned from all other nodes the oldest will down itself
and keep all other nodes running. The strategy will not down the single oldest node when
it is the only remaining node in the cluster.
Note that if the oldest node crashes the others will remove it from the cluster
when `down-if-alone` is `on`, otherwise they will down themselves if the
oldest node crashes, i.e. shut down the whole cluster together with the oldest node.
This strategy is good to use if you use Cluster Singleton and do not want to shut down the node
where the singleton instance runs. If the oldest node crashes a new singleton instance will be
started on the next oldest node. The drawback is that the strategy may keep only a few nodes
in a large cluster. For example, if one part with the oldest consists of 2 nodes and the
other part consists of 98 nodes then it will keep 2 nodes and shut down 98 nodes.
This strategy also handles the edge case that may occur when there are membership changes at the same
time as the network partition occurs. For example, the status of the oldest member is changed to `Exiting`
on one side but that information is not disseminated to the other side before the connection is broken.
It will detect this situation and make the safe decision to down all nodes on the side that sees the oldest as
`Leaving`. Note that this has the drawback that if the oldest was `Leaving` and not changed to `Exiting` then
each part will shut down itself, terminating the whole cluster.
The decision can be based on nodes with a configured `role` instead of all nodes in the cluster,
i.e. using the oldest member (singleton) within the nodes with that role.
Configuration:
```
akka.cluster.split-brain-resolver.active-strategy=keep-oldest
```
@@snip [reference.conf](/akka-cluster/src/main/resources/reference.conf) { #keep-oldest }
### Down All
The strategy named `down-all` will down all nodes.
This strategy can be a safe alternative if the network environment is highly unstable with unreachability observations
that can't be fully trusted, and including frequent occurrences of @ref:[indirectly connected nodes](#indirectly-connected-nodes).
Due to the instability there is an increased risk of different information on different sides of partitions and
therefore the other strategies may result in conflicting decisions. In such environments it can be better to shutdown
all nodes and start up a new fresh cluster.
Shutting down all nodes means that the system will be completely unavailable until nodes have been restarted and
formed a new cluster. This strategy is not recommended for large clusters (> 10 nodes) because any minor problem
will shutdown all nodes, and that is more likely to happen in larger clusters since there are more nodes that
may fail.
See also @ref[Down all when unstable](#down-all-when-unstable) and @ref:[indirectly connected nodes](#indirectly-connected-nodes).
### Lease
The strategy named `lease-majority` is using a distributed lease (lock) to decide what nodes that are allowed to
survive. Only one SBR instance can acquire the lease make the decision to remain up. The other side will
not be able to aquire the lease and will therefore down itself.
Best effort is to keep the side that has most nodes, i.e. the majority side. This is achieved by adding a delay
before trying to acquire the lease on the minority side.
There is currently one supported implementation of the lease which is backed by a
[Custom Resource Definition (CRD)](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
in Kubernetes. It is described in the [Kubernetes Lease](https://doc.akka.io/docs/akka-management/current/kubernetes-lease.html)
documentation.
This strategy is very safe since coordination is added by an external arbiter. The trade-off compared to other
strategies is that it requires additional infrastructure for implementing the lease and it reduces the availability
of a decision to that of the system backing the lease store.
Similar to other strategies it is important that decisions are not deferred for too because the nodes that couldn't
acquire the lease must decide to down themselves, see @ref[Down all when unstable](#down-all-when-unstable).
In some cases the lease will be unavailable when needed for a decision from all SBR instances, e.g. because it is
on another side of a network partition, and then all nodes will be downed.
Configuration:
```
akka {
cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver {
active-strategy = "lease-majority"
lease-majority {
lease-implementation = "akka.lease.kubernetes"
}
}
}
}
```
@@snip [reference.conf](/akka-cluster/src/main/resources/reference.conf) { #lease-majority }
See also configuration and additional dependency in [Kubernetes Lease](https://doc.akka.io/docs/akka-management/current/kubernetes-lease.html)
## Indirectly connected nodes
In a malfunctional network there can be situations where nodes are observed as unreachable via some network
links but they are still indirectly connected via other nodes, i.e. it's not a clean network partition (or node crash).
When this situation is detected the Split Brain Resolvers will keep fully connected nodes and down all the indirectly
connected nodes.
If there is a combination of indirectly connected nodes and a clean network partition it will combine the
above decision with the ordinary decision, e.g. keep majority, after excluding suspicious failure detection
observations.
## Down all when unstable
When reachability observations by the failure detector are changed the SBR decisions
are deferred until there are no changes within the `stable-after` duration.
If this continues for too long it might be an indication of an unstable system/network
and it could result in delayed or conflicting decisions on separate sides of a network
partition.
As a precaution for that scenario all nodes are downed if no decision is made within
`stable-after + down-all-when-unstable` from the first unreachability event.
The measurement is reset if all unreachable have been healed, downed or removed, or
if there are no changes within `stable-after * 2`.
This is enabled by default for all strategies and by default the duration is derived to
be 3/4 of `stable-after`.
The below property can be defined as a duration of for how long the changes are acceptable to
continue after the `stable-after` or it can be set to `off` to disable this feature.
```
akka.cluster.split-brain-resolver {
down-all-when-unstable = 15s
stable-after = 20s
}
```
@@@ warning
It is recommended to keep `down-all-when-unstable` enabled and not set it to a longer duration than `stable-after`
(`down-removal-margin`) because that can result in delayed decisions on the side that should have been downed, e.g.
in the case of a clean network partition followed by continued instability on the side that should be downed.
That could result in that members are removed from one side but are still running on the other side.
@@@
## Multiple data centers
Akka Cluster has @ref:[support for multiple data centers](cluster-dc.md), where the cluster
membership is managed by each data center separately and independently of network partitions across different
data centers. The Split Brain Resolver is embracing that strategy and will not count nodes or down nodes in
another data center.
When there is a network partition across data centers the typical solution is to wait the partition out until it heals, i.e.
do nothing. Other decisions should be performed by an external monitoring tool or human operator.
## Cluster Singleton and Cluster Sharding
The purpose of Cluster Singleton and Cluster Sharding is to run at most one instance
of a given actor at any point in time. When such an instance is shut down a new instance
is supposed to be started elsewhere in the cluster. It is important that the new instance is
not started before the old instance has been stopped. This is especially important when the
singleton or the sharded instance is persistent, since there must only be one active
writer of the journaled events of a persistent actor instance.
Since the strategies on different sides of a network partition cannot communicate with each other
and they may take the decision at slightly different points in time there must be a time based
margin that makes sure that the new instance is not started before the old has been stopped.
You would like to configure this to a short duration to have quick failover, but that will increase the
risk of having multiple singleton/sharded instances running at the same time and it may take a different
amount of time to act on the decision (dissemination of the down/removal). The duration is by default
the same as the `stable-after` property (see @ref:[Stable after](#stable-after) above). It is recommended to
leave this value as is, but it can also be separately overriden with the `akka.cluster.down-removal-margin` property.
Another concern for setting this `stable-after`/`akka.cluster.down-removal-margin` is dealing with JVM pauses e.g.
garbage collection. When a node is unresponsive it is not known if it is due to a pause, overload, a crash or a
network partition. If it is pause that lasts longer than `stable-after` * 2 it gives time for SBR to down the node
and for singletons and shards to be started on other nodes. When the node un-pauses there will be a short time before
it sees its self as down where singletons and sharded actors are still running. It is therefore important to understand
the max pause time your application is likely to incur and make sure it is smaller than `stable-margin`.
If you choose to set a separate value for `down-removal-margin`, the recommended minimum duration for different cluster sizes are:
|cluster size | down-removal-margin|
|-------------|--------------------|
|5 | 7 s |
|10 | 10 s|
|20 | 13 s|
|50 | 17 s|
|100 | 20 s|
|1000 | 30 s|
### Expected Failover Time
As you have seen, there are several configured timeouts that add to the total failover latency.
With default configuration those are:
* failure detection 5 seconds
* stable-after 20 seconds
* down-removal-margin (by default the same as stable-after) 20 seconds
In total, you can expect the failover time of a singleton or sharded instance to be around 45 seconds
with default configuration. The default configuration is sized for a cluster of 100 nodes. If you have
around 10 nodes you can reduce the `stable-after` to around 10 seconds,
resulting in an expected failover time of around 25 seconds.

View file

@ -108,8 +108,7 @@ performs in such a case must be designed in a way that all concurrent leaders wo
might be impossible in general and only feasible under additional constraints). The most important case of that kind is a split
brain scenario where nodes need to be downed, either manually or automatically, to bring the cluster back to convergence.
See the [Lightbend Split Brain Resolver](https://doc.akka.io/docs/akka-enhancements/current/split-brain-resolver.html)
for an implementation of that.
The @ref:[Split Brain Resolver](../split-brain-resolver.md) is the built-in implementation of that.
Another transition that is possible without convergence is marking members as `WeaklyUp` as described in the next section.
@ -140,7 +139,7 @@ startup if a node to join have been specified in the configuration
* **leave** - tell a node to leave the cluster gracefully, normally triggered by ActorSystem or JVM shutdown through @ref[coordinated shutdown](../coordinated-shutdown.md)
* **down** - mark a node as down. This action is required to remove crashed nodes (that did not 'leave') from the cluster. It can be triggered manually, through [Cluster HTTP Management](https://doc.akka.io/docs/akka-management/current/cluster-http-management.html#put-cluster-members-address-responses), or automatically by a @ref[downing provider](cluster.md#downing) like [Split Brain Resolver](https://doc.akka.io/docs/akka-enhancements/current/split-brain-resolver.html)
* **down** - mark a node as down. This action is required to remove crashed nodes (that did not 'leave') from the cluster. It can be triggered manually, through [Cluster HTTP Management](https://doc.akka.io/docs/akka-management/current/cluster-http-management.html#put-cluster-members-address-responses), or automatically by a @ref[downing provider](cluster.md#downing) like @ref:[Split Brain Resolver](../split-brain-resolver.md)
#### Leader Actions

View file

@ -275,23 +275,22 @@ new joining members to 'Up'. The node must first become `reachable` again, or th
status of the unreachable member must be changed to `Down`. Changing status to `Down`
can be performed automatically or manually.
By default, downing must be performed manually using @ref:[HTTP](../additional/operations.md#http) or @ref:[JMX](../additional/operations.md#jmx).
We recommend that you enable the @ref:[Split Brain Resolver](../split-brain-resolver.md) that is part of the
Akka Cluster module. You enable it with configuration:
```
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
```
You should also consider the different available @ref:[downing strategies](../split-brain-resolver.md#strategies).
If a downing provider is not configured downing must be performed manually using
@ref:[HTTP](../additional/operations.md#http) or @ref:[JMX](../additional/operations.md#jmx).
Note that @ref:[Cluster Singleton](cluster-singleton.md) or @ref:[Cluster Sharding entities](cluster-sharding.md) that
are running on a crashed (unreachable) node will not be started on another node until the previous node has
been removed from the Cluster. Removal of crashed (unreachable) nodes is performed after a downing decision.
A production solution for downing is provided by
[Split Brain Resolver](https://doc.akka.io/docs/akka-enhancements/current/split-brain-resolver.html),
which is part of the [Akka Platform](https://www.lightbend.com/akka-platform).
If you dont have a Lightbend Subscription, you should still carefully read the
[documentation](https://doc.akka.io/docs/akka-enhancements/current/split-brain-resolver.html)
of the Split Brain Resolver and make sure that the solution you are using handles the concerns and scenarios
described there.
A custom downing strategy can be implemented with a @apidoc[akka.cluster.DowningProvider] and enabled with
configuration `akka.cluster.downing-provider-class`.
Downing can also be performed programmatically with @scala[`Cluster(system).manager ! Down(address)`]@java[`Cluster.get(system).manager().tell(Down(address))`],
but that is mostly useful from tests and when implementing a `DowningProvider`.

View file

@ -18,7 +18,6 @@ With a [Lightbend Platform Subscription](https://www.lightbend.com/lightbend-sub
[Akka Resilience Enhancements](https://doc.akka.io/docs/akka-enhancements/current/akka-resilience-enhancements.html):
* [Split Brain Resolver](https://doc.akka.io/docs/akka-enhancements/current/split-brain-resolver.html) &#8212; Detects and recovers from network partitions, eliminating data inconsistencies and possible downtime.
* [Configuration Checker](https://doc.akka.io/docs/akka-enhancements/current/config-checker.html) &#8212; Checks for potential configuration issues and logs suggestions.
* [Diagnostics Recorder](https://doc.akka.io/docs/akka-enhancements/current/diagnostics-recorder.html) &#8212; Captures configuration and system information in a format that makes it easy to troubleshoot issues during development and production.
* [Thread Starvation Detector](https://doc.akka.io/docs/akka-enhancements/current/starvation-detector.html) &#8212; Monitors an Akka system dispatcher and logs warnings if it becomes unresponsive.

View file

@ -25,6 +25,7 @@ project.description: Akka Cluster concepts, node membership service, CRDT Distri
* [multi-node-testing](../multi-node-testing.md)
* [remoting-artery](../remoting-artery.md)
* [remoting](../remoting.md)
* [split-brain-resolver](../split-brain-resolver.md)
* [coordination](../coordination.md)
* [choosing-cluster](choosing-cluster.md)

View file

@ -2,14 +2,14 @@
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.coordination.lease;
package jdocs.coordination;
import akka.actor.ActorSystem;
import akka.coordination.lease.LeaseSettings;
import akka.coordination.lease.javadsl.Lease;
import akka.coordination.lease.javadsl.LeaseProvider;
import akka.testkit.javadsl.TestKit;
import docs.akka.coordination.LeaseDocSpec;
import docs.coordination.LeaseDocSpec;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

View file

@ -2,15 +2,17 @@
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.coordination
package docs.coordination
import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import akka.cluster.Cluster
import akka.coordination.lease.LeaseSettings
import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider }
import akka.coordination.lease.scaladsl.Lease
import akka.coordination.lease.scaladsl.LeaseProvider
import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
//#lease-example
class SampleLease(settings: LeaseSettings) extends Lease(settings) {
@ -37,11 +39,11 @@ object LeaseDocSpec {
def config() =
ConfigFactory.parseString("""
jdocs-lease.lease-class = "jdocs.akka.coordination.lease.LeaseDocTest$SampleLease"
jdocs-lease.lease-class = "jdocs.coordination.LeaseDocTest$SampleLease"
#lease-config
akka.actor.provider = cluster
docs-lease {
lease-class = "docs.akka.coordination.SampleLease"
lease-class = "docs.coordination.SampleLease"
heartbeat-timeout = 100s
heartbeat-interval = 1s
lease-operation-timeout = 1s

View file

@ -119,7 +119,12 @@ lazy val benchJmh = akkaModule("akka-bench-jmh")
.disablePlugins(MimaPlugin, WhiteSourcePlugin, ValidatePullRequest, CopyrightHeaderInPr)
lazy val cluster = akkaModule("akka-cluster")
.dependsOn(remote, remoteTests % "test->test", testkit % "test->test", jackson % "test->test")
.dependsOn(
remote,
coordination % "compile->compile;test->test",
remoteTests % "test->test",
testkit % "test->test",
jackson % "test->test")
.settings(Dependencies.cluster)
.settings(AutomaticModuleName.settings("akka.cluster"))
.settings(OSGi.cluster)
@ -161,7 +166,10 @@ lazy val clusterSharding = akkaModule("akka-cluster-sharding")
.enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams)
lazy val clusterTools = akkaModule("akka-cluster-tools")
.dependsOn(cluster % "compile->compile;test->test;multi-jvm->multi-jvm", coordination, jackson % "test->test")
.dependsOn(
cluster % "compile->compile;test->test;multi-jvm->multi-jvm",
coordination % "compile->compile;test->test",
jackson % "test->test")
.settings(Dependencies.clusterTools)
.settings(AutomaticModuleName.settings("akka.cluster.tools"))
.settings(OSGi.clusterTools)
@ -500,7 +508,7 @@ lazy val discovery = akkaModule("akka-discovery")
.settings(OSGi.discovery)
lazy val coordination = akkaModule("akka-coordination")
.dependsOn(actor, testkit % "test->test", actorTests % "test->test", cluster % "test->test")
.dependsOn(actor, testkit % "test->test", actorTests % "test->test")
.settings(Dependencies.coordination)
.settings(AutomaticModuleName.settings("akka.coordination"))
.settings(OSGi.coordination)