SBR downing when a node is quarantined from the other side (#29737)

* SBR now downs a node when it notices that it has been quarantined from other nodes #29565

* Them mima excludes

* Review feedback mostly addressed

* One more stale comment removed

* More stress

* Ignore if remote quarantining node is not part of cluster

* Preliminary (untested) keepalive server support

* Completed reproducer of scenario discussed in PR

* Fix weird wrong extends in multi-jvm tests

* Put the test transport dropping after control junction to also drop control messages on blackhole.

* Test cleanup/review feedback addressed

* Ping from both nodes of side 1

Co-authored-by: Renato Cavalcanti <renato@cavalcanti.be>

* Add some debug logging to test to nail down failure cause

* Log when InboundTestStage lets messages through because no association yet

Co-authored-by: Renato Cavalcanti <renato@cavalcanti.be>
This commit is contained in:
Johan Andrén 2021-01-28 10:49:15 +01:00 committed by GitHub
parent 0d64ac3cd7
commit 26c56e5825
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 370 additions and 47 deletions

View file

@ -0,0 +1,7 @@
# internals only
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.KeepMajority.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.DownAllNodes.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.StaticQuorum.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.DowningStrategy.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.LeaseMajority.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.KeepOldest.this")

View file

@ -7,7 +7,6 @@ package akka.cluster.sbr
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import akka.actor.Address
import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
@ -49,12 +48,15 @@ import akka.coordination.lease.scaladsl.Lease
case object ReverseDownIndirectlyConnected extends Decision {
override def isIndirectlyConnected = true
}
case object DownSelfQuarantinedByRemote extends Decision {
override def isIndirectlyConnected: Boolean = false
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] abstract class DowningStrategy(val selfDc: DataCenter) {
@InternalApi private[akka] abstract class DowningStrategy(val selfDc: DataCenter, selfUniqueAddress: UniqueAddress) {
import DowningStrategy._
// may contain Joining and WeaklyUp
@ -273,6 +275,9 @@ import akka.coordination.lease.scaladsl.Lease
case ReverseDownIndirectlyConnected =>
// indirectly connected + all reachable
downable.intersect(indirectlyConnected).union(downable.diff(unreachable))
case DownSelfQuarantinedByRemote =>
if (downable.contains(selfUniqueAddress)) Set(selfUniqueAddress)
else Set.empty
}
}
@ -321,6 +326,8 @@ import akka.coordination.lease.scaladsl.Lease
case DownIndirectlyConnected => ReverseDownIndirectlyConnected
case AcquireLeaseAndDownIndirectlyConnected(_) => ReverseDownIndirectlyConnected
case ReverseDownIndirectlyConnected => DownIndirectlyConnected
case DownSelfQuarantinedByRemote =>
throw new IllegalArgumentException("Not expected to ever try to reverse DownSelfQuarantinedByRemote")
}
}
@ -353,8 +360,9 @@ import akka.coordination.lease.scaladsl.Lease
@InternalApi private[sbr] final class StaticQuorum(
selfDc: DataCenter,
val quorumSize: Int,
override val role: Option[String])
extends DowningStrategy(selfDc) {
override val role: Option[String],
selfUniqueAddress: UniqueAddress)
extends DowningStrategy(selfDc, selfUniqueAddress) {
import DowningStrategy._
override def decide(): Decision = {
@ -386,8 +394,11 @@ import akka.coordination.lease.scaladsl.Lease
*
* It is only counting members within the own data center.
*/
@InternalApi private[sbr] final class KeepMajority(selfDc: DataCenter, override val role: Option[String])
extends DowningStrategy(selfDc) {
@InternalApi private[sbr] final class KeepMajority(
selfDc: DataCenter,
override val role: Option[String],
selfUniqueAddress: UniqueAddress)
extends DowningStrategy(selfDc, selfUniqueAddress) {
import DowningStrategy._
override def decide(): Decision = {
@ -475,8 +486,9 @@ import akka.coordination.lease.scaladsl.Lease
@InternalApi private[sbr] final class KeepOldest(
selfDc: DataCenter,
val downIfAlone: Boolean,
override val role: Option[String])
extends DowningStrategy(selfDc) {
override val role: Option[String],
selfUniqueAddress: UniqueAddress)
extends DowningStrategy(selfDc, selfUniqueAddress) {
import DowningStrategy._
// sort by age, oldest first
@ -552,7 +564,8 @@ import akka.coordination.lease.scaladsl.Lease
*
* Down all nodes unconditionally.
*/
@InternalApi private[sbr] final class DownAllNodes(selfDc: DataCenter) extends DowningStrategy(selfDc) {
@InternalApi private[sbr] final class DownAllNodes(selfDc: DataCenter, selfUniqueAddress: UniqueAddress)
extends DowningStrategy(selfDc, selfUniqueAddress) {
import DowningStrategy._
override def decide(): Decision =
@ -577,8 +590,9 @@ import akka.coordination.lease.scaladsl.Lease
selfDc: DataCenter,
override val role: Option[String],
_lease: Lease,
acquireLeaseDelayForMinority: FiniteDuration)
extends DowningStrategy(selfDc) {
acquireLeaseDelayForMinority: FiniteDuration,
selfUniqueAddress: UniqueAddress)
extends DowningStrategy(selfDc, selfUniqueAddress) {
import DowningStrategy._
override val lease: Option[Lease] = Some(_lease)

View file

@ -9,7 +9,6 @@ import java.time.temporal.ChronoUnit
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.Address
import akka.actor.ExtendedActorSystem
@ -30,6 +29,7 @@ import akka.cluster.sbr.DowningStrategy.Decision
import akka.event.DiagnosticMarkerBusLoggingAdapter
import akka.event.Logging
import akka.pattern.pipe
import akka.remote.artery.ThisActorSystemQuarantinedEvent
/**
* INTERNAL API
@ -111,10 +111,13 @@ import akka.pattern.pipe
// re-subscribe when restart
override def preStart(): Unit = {
cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[ClusterDomainEvent])
// note that this is artery only
context.system.eventStream.subscribe(self, classOf[ThisActorSystemQuarantinedEvent])
super.preStart()
}
override def postStop(): Unit = {
cluster.unsubscribe(self)
context.system.eventStream.unsubscribe(self, classOf[ThisActorSystemQuarantinedEvent])
super.postStop()
}
@ -261,23 +264,24 @@ import akka.pattern.pipe
}
def receive: Receive = {
case SeenChanged(_, seenBy) => seenChanged(seenBy)
case MemberJoined(m) => addJoining(m)
case MemberWeaklyUp(m) => addWeaklyUp(m)
case MemberUp(m) => addUp(m)
case MemberLeft(m) => leaving(m)
case UnreachableMember(m) => unreachableMember(m)
case MemberDowned(m) => unreachableMember(m)
case MemberExited(m) => unreachableMember(m)
case ReachableMember(m) => reachableMember(m)
case ReachabilityChanged(r) => reachabilityChanged(r)
case MemberRemoved(m, _) => remove(m)
case UnreachableDataCenter(dc) => unreachableDataCenter(dc)
case ReachableDataCenter(dc) => reachableDataCenter(dc)
case LeaderChanged(leaderOption) => leaderChanged(leaderOption)
case ReleaseLeaseResult(released) => releaseLeaseResult(released)
case Tick => tick()
case _: ClusterDomainEvent => // not interested in other events
case SeenChanged(_, seenBy) => seenChanged(seenBy)
case MemberJoined(m) => addJoining(m)
case MemberWeaklyUp(m) => addWeaklyUp(m)
case MemberUp(m) => addUp(m)
case MemberLeft(m) => leaving(m)
case UnreachableMember(m) => unreachableMember(m)
case MemberDowned(m) => unreachableMember(m)
case MemberExited(m) => unreachableMember(m)
case ReachableMember(m) => reachableMember(m)
case ReachabilityChanged(r) => reachabilityChanged(r)
case MemberRemoved(m, _) => remove(m)
case UnreachableDataCenter(dc) => unreachableDataCenter(dc)
case ReachableDataCenter(dc) => reachableDataCenter(dc)
case LeaderChanged(leaderOption) => leaderChanged(leaderOption)
case ReleaseLeaseResult(released) => releaseLeaseResult(released)
case Tick => tick()
case ThisActorSystemQuarantinedEvent(_, remote) => thisActorSystemWasQuarantined(remote)
case _: ClusterDomainEvent => // not interested in other events
}
private def leaderChanged(leaderOption: Option[Address]): Unit = {
@ -346,6 +350,15 @@ import akka.pattern.pipe
}
}
private def thisActorSystemWasQuarantined(remoteUnique: akka.remote.UniqueAddress): Unit = {
val remote = UniqueAddress(remoteUnique.address, remoteUnique.uid)
if (Cluster(context.system).state.members.exists(m => m.uniqueAddress == remote)) {
actOnDecision(DowningStrategy.DownSelfQuarantinedByRemote)
} else {
log.debug("Remote [{}] quarantined this system but is not part of cluster, ignoring", remote)
}
}
private def acquireLease(): Unit = {
log.debug("SBR trying to acquire lease")
implicit val ec: ExecutionContext = internalDispatcher

View file

@ -41,20 +41,20 @@ final class SplitBrainResolverProvider(system: ActorSystem) extends DowningProvi
val strategy =
settings.DowningStrategy match {
case KeepMajorityName =>
new KeepMajority(selfDc, settings.keepMajorityRole)
new KeepMajority(selfDc, settings.keepMajorityRole, cluster.selfUniqueAddress)
case StaticQuorumName =>
val s = settings.staticQuorumSettings
new StaticQuorum(selfDc, s.size, s.role)
new StaticQuorum(selfDc, s.size, s.role, cluster.selfUniqueAddress)
case KeepOldestName =>
val s = settings.keepOldestSettings
new KeepOldest(selfDc, s.downIfAlone, s.role)
new KeepOldest(selfDc, s.downIfAlone, s.role, cluster.selfUniqueAddress)
case DownAllName =>
new DownAllNodes(selfDc)
new DownAllNodes(selfDc, cluster.selfUniqueAddress)
case LeaseMajorityName =>
val s = settings.leaseMajoritySettings
val leaseOwnerName = cluster.selfUniqueAddress.address.hostPort
val lease = LeaseProvider(system).getLease(s"${system.name}-akka-sbr", s.leaseImplementation, leaseOwnerName)
new LeaseMajority(selfDc, s.role, lease, s.acquireLeaseDelayForMinority)
new LeaseMajority(selfDc, s.role, lease, s.acquireLeaseDelayForMinority, cluster.selfUniqueAddress)
}
Some(SplitBrainResolver.props(settings.DowningStableAfter, strategy))

View file

@ -0,0 +1,136 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.RootActorPath
import scala.concurrent.duration._
import akka.remote.artery.ArterySettings
import akka.remote.artery.ThisActorSystemQuarantinedEvent
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter
import akka.testkit.LongRunningTest
import com.typesafe.config.ConfigFactory
object DowningWhenOtherHasQuarantinedThisActorSystemSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(
debugConfig(on = false)
.withFallback(MultiNodeClusterSpec.clusterConfig)
.withFallback(
ConfigFactory.parseString("""
akka.remote.artery.enabled = on
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
# speed up decision
akka.cluster.split-brain-resolver.stable-after = 5s
""")))
// exaggerate the timing issue by ,making the second node decide slower
// this is to more consistently repeat the scenario where the other side completes downing
// while the isolated part still has not made a decision and then see quarantined connections from the other nodes
nodeConfig(second)(ConfigFactory.parseString("akka.cluster.split-brain-resolver.stable-after = 15s"))
testTransport(on = true)
}
class DowningWhenOtherHasQuarantinedThisActorSystemMultiJvmNode1
extends DowningWhenOtherHasQuarantinedThisActorSystemSpec
class DowningWhenOtherHasQuarantinedThisActorSystemMultiJvmNode2
extends DowningWhenOtherHasQuarantinedThisActorSystemSpec
class DowningWhenOtherHasQuarantinedThisActorSystemMultiJvmNode3
extends DowningWhenOtherHasQuarantinedThisActorSystemSpec
abstract class DowningWhenOtherHasQuarantinedThisActorSystemSpec
extends MultiNodeSpec(DowningWhenOtherHasQuarantinedThisActorSystemSpec)
with MultiNodeClusterSpec {
import DowningWhenOtherHasQuarantinedThisActorSystemSpec._
"Cluster node downed by other" must {
if (!ArterySettings(system.settings.config.getConfig("akka.remote.artery")).Enabled) {
// this feature only works in Artery, because classic remoting will not accept connections from
// a quarantined node, and that is too high risk of introducing regressions if changing that
pending
}
"join cluster" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
enterBarrier("after-1")
}
"down itself" taggedAs LongRunningTest in {
runOn(first) {
testConductor.blackhole(first, second, ThrottlerTransportAdapter.Direction.Both).await
testConductor.blackhole(third, second, ThrottlerTransportAdapter.Direction.Both).await
}
enterBarrier("blackhole")
within(15.seconds) {
runOn(first) {
awaitAssert {
cluster.state.unreachable.map(_.address) should ===(Set(address(second)))
}
awaitAssert {
// second downed and removed
cluster.state.members.map(_.address) should ===(Set(address(first), address(third)))
}
}
runOn(second) {
awaitAssert {
cluster.state.unreachable.map(_.address) should ===(Set(address(first), address(third)))
}
}
}
enterBarrier("down-second")
runOn(first) {
testConductor.passThrough(first, second, ThrottlerTransportAdapter.Direction.Both).await
testConductor.passThrough(third, second, ThrottlerTransportAdapter.Direction.Both).await
}
enterBarrier("pass-through")
runOn(second) {
within(10.seconds) {
awaitAssert {
// try to ping first (Cluster Heartbeat messages will not trigger the Quarantine message)
system.actorSelection(RootActorPath(first) / "user").tell(Identify(None), ActorRef.noSender)
// shutting down itself triggered by ThisActorSystemQuarantinedEvent
cluster.isTerminated should ===(true)
}
}
}
enterBarrier("after-2")
}
"not be triggered by another node shutting down" taggedAs LongRunningTest in {
runOn(first) {
system.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent])
}
enterBarrier("subscribing")
runOn(third) {
cluster.shutdown()
}
runOn(first) {
val sel = system.actorSelection(RootActorPath(third) / "user")
(1 to 25).foreach { _ =>
sel.tell(Identify(None), ActorRef.noSender) // try to ping third
expectNoMessage(200.millis) // no ThisActorSystemQuarantinedEvent
}
}
enterBarrier("after-2")
}
}
}

View file

@ -0,0 +1,141 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.RootActorPath
import akka.remote.artery.ArterySettings
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter
import akka.testkit.LongRunningTest
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object SplitBrainQuarantineSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
testTransport(on = true)
commonConfig(
debugConfig(on = true)
.withFallback(MultiNodeClusterSpec.clusterConfig)
.withFallback(ConfigFactory.parseString(
"""
akka.remote.artery.enabled = on
akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
# we dont really want this to hit, but we need the sbr enabled to know the quarantining
# downing does not trigger
akka.cluster.split-brain-resolver.stable-after = 5 minutes
akka.cluster.debug.verbose-gossip-logging = on
""")))
}
class SplitBrainQuarantineMultiJvmNode1 extends SplitBrainQuarantineSpec
class SplitBrainQuarantineMultiJvmNode2 extends SplitBrainQuarantineSpec
class SplitBrainQuarantineMultiJvmNode3 extends SplitBrainQuarantineSpec
class SplitBrainQuarantineMultiJvmNode4 extends SplitBrainQuarantineSpec
abstract class SplitBrainQuarantineSpec extends MultiNodeSpec(SplitBrainQuarantineSpec) with MultiNodeClusterSpec {
import SplitBrainQuarantineSpec._
// reproduces the scenario where cluster is partitioned and each side (incorrectly) downs the other,
// and after that the partition is resolved and the two split brain halves reconnects
"Cluster node downed by other" must {
if (!ArterySettings(system.settings.config.getConfig("akka.remote.artery")).Enabled) {
// this feature only works in Artery, because classic remoting will not accept connections from
// a quarantined node, and that is too high risk of introducing regressions if changing that
pending
}
"join cluster" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third, fourth)
enterBarrier("after-1")
}
"split brain" taggedAs LongRunningTest in {
runOn(first) {
testConductor.blackhole(first, third, ThrottlerTransportAdapter.Direction.Both).await
testConductor.blackhole(first, fourth, ThrottlerTransportAdapter.Direction.Both).await
testConductor.blackhole(second, third, ThrottlerTransportAdapter.Direction.Both).await
testConductor.blackhole(second, fourth, ThrottlerTransportAdapter.Direction.Both).await
}
enterBarrier("blackhole")
system.log.info("cluster split into [JVM-1, JVM-2] and [JVM-3, JVM-4] with blackhole")
within(15.seconds) {
runOn(first) {
cluster.down(third)
cluster.down(fourth)
}
runOn(first, second) {
awaitAssert {
cluster.state.members.collect { case m if m.status == MemberStatus.Up => m.address } should ===(
Set(address(first), address(second)))
cluster.state.members.size should ===(2)
}
system.log.info("JVM-3 and JVM-4 downed from JVM-1")
}
runOn(third) {
cluster.down(first)
cluster.down(second)
}
runOn(third, fourth) {
awaitAssert {
cluster.state.members.collect { case m if m.status == MemberStatus.Up => m.address } should ===(
Set(address(third), address(fourth)))
cluster.state.members.size should ===(2)
}
system.log.info("JVM-1 and JVM-2 downed from JVM-3")
}
}
enterBarrier("brain-split")
runOn(first) {
system.log.info("unblackholing cluster")
testConductor.passThrough(first, third, ThrottlerTransportAdapter.Direction.Both).await
testConductor.passThrough(first, fourth, ThrottlerTransportAdapter.Direction.Both).await
testConductor.passThrough(second, third, ThrottlerTransportAdapter.Direction.Both).await
testConductor.passThrough(second, fourth, ThrottlerTransportAdapter.Direction.Both).await
}
enterBarrier("unblackholed")
// must send some actual messages that would trigger Quarantine to be sure it does in fact not happen
runOn(first, second) {
system.actorSelection(RootActorPath(third) / "user").tell(Identify(None), ActorRef.noSender)
system.actorSelection(RootActorPath(fourth) / "user").tell(Identify(None), ActorRef.noSender)
}
runOn(third, fourth) {
system.actorSelection(RootActorPath(first) / "user").tell(Identify(None), ActorRef.noSender)
system.actorSelection(RootActorPath(second) / "user").tell(Identify(None), ActorRef.noSender)
}
Thread.sleep(3000)
runOn(first, second) {
system.actorSelection(RootActorPath(third) / "user").tell(Identify(None), ActorRef.noSender)
system.actorSelection(RootActorPath(fourth) / "user").tell(Identify(None), ActorRef.noSender)
}
runOn(third, fourth) {
system.actorSelection(RootActorPath(first) / "user").tell(Identify(None), ActorRef.noSender)
system.actorSelection(RootActorPath(second) / "user").tell(Identify(None), ActorRef.noSender)
}
Thread.sleep(5000)
enterBarrier("after-pass-through")
// as the side that would quarantine each node is now not a part of the cluster it is the same as
// a random node connecting and claiming a node is quarantined and therefore it cannot be trusted
// enough to trigger a ThisActorSystemQuarantinedEvent-termination
cluster.isTerminated should ===(false)
enterBarrier("verify-alive")
}
}
}

View file

@ -112,6 +112,7 @@ class SplitBrainResolverSpec
import TestAddresses._
private val selfDc = TestAddresses.defaultDataCenter
private lazy val selfUniqueAddress = Cluster(system).selfUniqueAddress
private val testLeaseSettings =
new LeaseSettings("akka-sbr", "test", new TimeoutSettings(1.second, 2.minutes, 3.seconds), ConfigFactory.empty)
@ -180,7 +181,7 @@ class SplitBrainResolverSpec
"StaticQuorum" must {
class Setup2(size: Int, role: Option[String]) extends StrategySetup {
override def createStrategy() =
new StaticQuorum(selfDc, size, role)
new StaticQuorum(selfDc, size, role, selfUniqueAddress)
}
"down unreachable when enough reachable nodes" in new Setup2(3, None) {
@ -257,7 +258,7 @@ class SplitBrainResolverSpec
"KeepMajority" must {
class Setup2(role: Option[String]) extends StrategySetup {
override def createStrategy() =
new KeepMajority(selfDc, role)
new KeepMajority(selfDc, role, selfUniqueAddress)
}
"down minority partition: {A, C, E} | {B, D} => {A, C, E}" in new Setup2(role = None) {
@ -593,7 +594,7 @@ class SplitBrainResolverSpec
"KeepOldest" must {
class Setup2(downIfAlone: Boolean = true, role: Option[String] = None) extends StrategySetup {
override def createStrategy() = new KeepOldest(selfDc, downIfAlone, role)
override def createStrategy() = new KeepOldest(selfDc, downIfAlone, role, selfUniqueAddress)
}
"keep partition with oldest" in new Setup2 {
@ -803,7 +804,7 @@ class SplitBrainResolverSpec
"DownAllNodes" must {
class Setup2 extends StrategySetup {
override def createStrategy() = new DownAllNodes(selfDc)
override def createStrategy() = new DownAllNodes(selfDc, selfUniqueAddress)
}
"down all" in new Setup2 {
@ -826,7 +827,7 @@ class SplitBrainResolverSpec
val acquireLeaseDelayForMinority: FiniteDuration = 2.seconds
override def createStrategy() =
new LeaseMajority(selfDc, role, testLease, acquireLeaseDelayForMinority)
new LeaseMajority(selfDc, role, testLease, acquireLeaseDelayForMinority, selfUniqueAddress)
}
"decide AcquireLeaseAndDownUnreachable, and DownReachable as reverse decision" in {
@ -935,11 +936,11 @@ class SplitBrainResolverSpec
"Strategy" must {
class MajoritySetup(role: Option[String] = None) extends StrategySetup {
override def createStrategy() = new KeepMajority(selfDc, role)
override def createStrategy() = new KeepMajority(selfDc, role, selfUniqueAddress)
}
class OldestSetup(role: Option[String] = None) extends StrategySetup {
override def createStrategy() = new KeepOldest(selfDc, downIfAlone = true, role)
override def createStrategy() = new KeepOldest(selfDc, downIfAlone = true, role, selfUniqueAddress)
}
"add and remove members with default Member ordering" in {
@ -1081,24 +1082,29 @@ class SplitBrainResolverSpec
role: Option[String],
downAllWhenUnstable: FiniteDuration = Duration.Zero,
tickInterval: FiniteDuration = Duration.Zero)
extends Setup(stableAfter, new KeepMajority(selfDc, role), selfUniqueAddress, downAllWhenUnstable, tickInterval)
extends Setup(
stableAfter,
new KeepMajority(selfDc, role, selfUniqueAddress),
selfUniqueAddress,
downAllWhenUnstable,
tickInterval)
class SetupKeepOldest(
stableAfter: FiniteDuration,
selfUniqueAddress: UniqueAddress,
downIfAlone: Boolean,
role: Option[String])
extends Setup(stableAfter, new KeepOldest(selfDc, downIfAlone, role), selfUniqueAddress)
extends Setup(stableAfter, new KeepOldest(selfDc, downIfAlone, role, selfUniqueAddress), selfUniqueAddress)
class SetupStaticQuorum(
stableAfter: FiniteDuration,
selfUniqueAddress: UniqueAddress,
size: Int,
role: Option[String])
extends Setup(stableAfter, new StaticQuorum(selfDc, size, role), selfUniqueAddress)
extends Setup(stableAfter, new StaticQuorum(selfDc, size, role, selfUniqueAddress), selfUniqueAddress)
class SetupDownAllNodes(stableAfter: FiniteDuration, selfUniqueAddress: UniqueAddress)
extends Setup(stableAfter, new DownAllNodes(selfDc), selfUniqueAddress)
extends Setup(stableAfter, new DownAllNodes(selfDc, selfUniqueAddress), selfUniqueAddress)
class SetupLeaseMajority(
stableAfter: FiniteDuration,
@ -1109,7 +1115,7 @@ class SplitBrainResolverSpec
tickInterval: FiniteDuration = Duration.Zero)
extends Setup(
stableAfter,
new LeaseMajority(selfDc, role, testLease, acquireLeaseDelayForMinority = 20.millis),
new LeaseMajority(selfDc, role, testLease, acquireLeaseDelayForMinority = 20.millis, selfUniqueAddress),
selfUniqueAddress,
downAllWhenUnstable,
tickInterval)

View file

@ -809,9 +809,9 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
system.deadLetters,
settings.Advanced.SystemMessageResendInterval,
settings.Advanced.SysMsgBufferSize))
.viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right)
// note that System messages must not be dropped before the SystemMessageDelivery stage
.via(outboundTestFlow(outboundContext))
.viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right)
.via(createEncoder(envelopeBufferPool, ControlStreamId))
.toMat(outboundTransportSink(outboundContext, ControlStreamId, envelopeBufferPool))(Keep.both)

View file

@ -33,6 +33,8 @@ private[remote] class SharedTestState {
private val state = new AtomicReference[TestState](TestState(Map.empty, None))
def anyBlackholePresent(): Boolean = state.get.blackholes.nonEmpty
def isBlackhole(from: Address, to: Address): Boolean =
state.get.blackholes.get(from) match {
case Some(destinations) => destinations(to)
@ -161,6 +163,10 @@ private[remote] class InboundTestStage(inboundContext: InboundContext, state: Sh
env.association match {
case OptionVal.None =>
// unknown, handshake not completed
if (state.anyBlackholePresent())
log.debug(
"inbound message [{}] before handshake completed, cannot check if remote is blackholed, letting through",
Logging.messageClassName(env.message))
push(out, env)
case OptionVal.Some(association) =>
if (state.isBlackhole(inboundContext.localAddress.address, association.remoteAddress)) {