diff --git a/akka-cluster/src/main/mima-filters/2.6.10.backwards.excludes/down-when-quarantined.excludes b/akka-cluster/src/main/mima-filters/2.6.10.backwards.excludes/down-when-quarantined.excludes new file mode 100644 index 0000000000..2f9cd62459 --- /dev/null +++ b/akka-cluster/src/main/mima-filters/2.6.10.backwards.excludes/down-when-quarantined.excludes @@ -0,0 +1,7 @@ +# internals only +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.KeepMajority.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.DownAllNodes.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.StaticQuorum.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.DowningStrategy.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.LeaseMajority.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sbr.KeepOldest.this") \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala index 8c09a4ba1d..39003a1cfd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala @@ -7,7 +7,6 @@ package akka.cluster.sbr import scala.collection.immutable import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration - import akka.actor.Address import akka.annotation.InternalApi import akka.annotation.InternalStableApi @@ -49,12 +48,15 @@ import akka.coordination.lease.scaladsl.Lease case object ReverseDownIndirectlyConnected extends Decision { override def isIndirectlyConnected = true } + case object DownSelfQuarantinedByRemote extends Decision { + override def isIndirectlyConnected: Boolean = false + } } /** * INTERNAL API */ -@InternalApi private[akka] abstract class DowningStrategy(val selfDc: DataCenter) { +@InternalApi private[akka] abstract class DowningStrategy(val selfDc: DataCenter, selfUniqueAddress: UniqueAddress) { import DowningStrategy._ // may contain Joining and WeaklyUp @@ -273,6 +275,9 @@ import akka.coordination.lease.scaladsl.Lease case ReverseDownIndirectlyConnected => // indirectly connected + all reachable downable.intersect(indirectlyConnected).union(downable.diff(unreachable)) + case DownSelfQuarantinedByRemote => + if (downable.contains(selfUniqueAddress)) Set(selfUniqueAddress) + else Set.empty } } @@ -321,6 +326,8 @@ import akka.coordination.lease.scaladsl.Lease case DownIndirectlyConnected => ReverseDownIndirectlyConnected case AcquireLeaseAndDownIndirectlyConnected(_) => ReverseDownIndirectlyConnected case ReverseDownIndirectlyConnected => DownIndirectlyConnected + case DownSelfQuarantinedByRemote => + throw new IllegalArgumentException("Not expected to ever try to reverse DownSelfQuarantinedByRemote") } } @@ -353,8 +360,9 @@ import akka.coordination.lease.scaladsl.Lease @InternalApi private[sbr] final class StaticQuorum( selfDc: DataCenter, val quorumSize: Int, - override val role: Option[String]) - extends DowningStrategy(selfDc) { + override val role: Option[String], + selfUniqueAddress: UniqueAddress) + extends DowningStrategy(selfDc, selfUniqueAddress) { import DowningStrategy._ override def decide(): Decision = { @@ -386,8 +394,11 @@ import akka.coordination.lease.scaladsl.Lease * * It is only counting members within the own data center. */ -@InternalApi private[sbr] final class KeepMajority(selfDc: DataCenter, override val role: Option[String]) - extends DowningStrategy(selfDc) { +@InternalApi private[sbr] final class KeepMajority( + selfDc: DataCenter, + override val role: Option[String], + selfUniqueAddress: UniqueAddress) + extends DowningStrategy(selfDc, selfUniqueAddress) { import DowningStrategy._ override def decide(): Decision = { @@ -475,8 +486,9 @@ import akka.coordination.lease.scaladsl.Lease @InternalApi private[sbr] final class KeepOldest( selfDc: DataCenter, val downIfAlone: Boolean, - override val role: Option[String]) - extends DowningStrategy(selfDc) { + override val role: Option[String], + selfUniqueAddress: UniqueAddress) + extends DowningStrategy(selfDc, selfUniqueAddress) { import DowningStrategy._ // sort by age, oldest first @@ -552,7 +564,8 @@ import akka.coordination.lease.scaladsl.Lease * * Down all nodes unconditionally. */ -@InternalApi private[sbr] final class DownAllNodes(selfDc: DataCenter) extends DowningStrategy(selfDc) { +@InternalApi private[sbr] final class DownAllNodes(selfDc: DataCenter, selfUniqueAddress: UniqueAddress) + extends DowningStrategy(selfDc, selfUniqueAddress) { import DowningStrategy._ override def decide(): Decision = @@ -577,8 +590,9 @@ import akka.coordination.lease.scaladsl.Lease selfDc: DataCenter, override val role: Option[String], _lease: Lease, - acquireLeaseDelayForMinority: FiniteDuration) - extends DowningStrategy(selfDc) { + acquireLeaseDelayForMinority: FiniteDuration, + selfUniqueAddress: UniqueAddress) + extends DowningStrategy(selfDc, selfUniqueAddress) { import DowningStrategy._ override val lease: Option[Lease] = Some(_lease) diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala index 382ae3f88e..27d5d9a14d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala @@ -9,7 +9,6 @@ import java.time.temporal.ChronoUnit import scala.concurrent.ExecutionContext import scala.concurrent.duration._ - import akka.actor.Actor import akka.actor.Address import akka.actor.ExtendedActorSystem @@ -30,6 +29,7 @@ import akka.cluster.sbr.DowningStrategy.Decision import akka.event.DiagnosticMarkerBusLoggingAdapter import akka.event.Logging import akka.pattern.pipe +import akka.remote.artery.ThisActorSystemQuarantinedEvent /** * INTERNAL API @@ -111,10 +111,13 @@ import akka.pattern.pipe // re-subscribe when restart override def preStart(): Unit = { cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[ClusterDomainEvent]) + // note that this is artery only + context.system.eventStream.subscribe(self, classOf[ThisActorSystemQuarantinedEvent]) super.preStart() } override def postStop(): Unit = { cluster.unsubscribe(self) + context.system.eventStream.unsubscribe(self, classOf[ThisActorSystemQuarantinedEvent]) super.postStop() } @@ -261,23 +264,24 @@ import akka.pattern.pipe } def receive: Receive = { - case SeenChanged(_, seenBy) => seenChanged(seenBy) - case MemberJoined(m) => addJoining(m) - case MemberWeaklyUp(m) => addWeaklyUp(m) - case MemberUp(m) => addUp(m) - case MemberLeft(m) => leaving(m) - case UnreachableMember(m) => unreachableMember(m) - case MemberDowned(m) => unreachableMember(m) - case MemberExited(m) => unreachableMember(m) - case ReachableMember(m) => reachableMember(m) - case ReachabilityChanged(r) => reachabilityChanged(r) - case MemberRemoved(m, _) => remove(m) - case UnreachableDataCenter(dc) => unreachableDataCenter(dc) - case ReachableDataCenter(dc) => reachableDataCenter(dc) - case LeaderChanged(leaderOption) => leaderChanged(leaderOption) - case ReleaseLeaseResult(released) => releaseLeaseResult(released) - case Tick => tick() - case _: ClusterDomainEvent => // not interested in other events + case SeenChanged(_, seenBy) => seenChanged(seenBy) + case MemberJoined(m) => addJoining(m) + case MemberWeaklyUp(m) => addWeaklyUp(m) + case MemberUp(m) => addUp(m) + case MemberLeft(m) => leaving(m) + case UnreachableMember(m) => unreachableMember(m) + case MemberDowned(m) => unreachableMember(m) + case MemberExited(m) => unreachableMember(m) + case ReachableMember(m) => reachableMember(m) + case ReachabilityChanged(r) => reachabilityChanged(r) + case MemberRemoved(m, _) => remove(m) + case UnreachableDataCenter(dc) => unreachableDataCenter(dc) + case ReachableDataCenter(dc) => reachableDataCenter(dc) + case LeaderChanged(leaderOption) => leaderChanged(leaderOption) + case ReleaseLeaseResult(released) => releaseLeaseResult(released) + case Tick => tick() + case ThisActorSystemQuarantinedEvent(_, remote) => thisActorSystemWasQuarantined(remote) + case _: ClusterDomainEvent => // not interested in other events } private def leaderChanged(leaderOption: Option[Address]): Unit = { @@ -346,6 +350,15 @@ import akka.pattern.pipe } } + private def thisActorSystemWasQuarantined(remoteUnique: akka.remote.UniqueAddress): Unit = { + val remote = UniqueAddress(remoteUnique.address, remoteUnique.uid) + if (Cluster(context.system).state.members.exists(m => m.uniqueAddress == remote)) { + actOnDecision(DowningStrategy.DownSelfQuarantinedByRemote) + } else { + log.debug("Remote [{}] quarantined this system but is not part of cluster, ignoring", remote) + } + } + private def acquireLease(): Unit = { log.debug("SBR trying to acquire lease") implicit val ec: ExecutionContext = internalDispatcher diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolverProvider.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolverProvider.scala index a6795f479e..23c0a74c65 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolverProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolverProvider.scala @@ -41,20 +41,20 @@ final class SplitBrainResolverProvider(system: ActorSystem) extends DowningProvi val strategy = settings.DowningStrategy match { case KeepMajorityName => - new KeepMajority(selfDc, settings.keepMajorityRole) + new KeepMajority(selfDc, settings.keepMajorityRole, cluster.selfUniqueAddress) case StaticQuorumName => val s = settings.staticQuorumSettings - new StaticQuorum(selfDc, s.size, s.role) + new StaticQuorum(selfDc, s.size, s.role, cluster.selfUniqueAddress) case KeepOldestName => val s = settings.keepOldestSettings - new KeepOldest(selfDc, s.downIfAlone, s.role) + new KeepOldest(selfDc, s.downIfAlone, s.role, cluster.selfUniqueAddress) case DownAllName => - new DownAllNodes(selfDc) + new DownAllNodes(selfDc, cluster.selfUniqueAddress) case LeaseMajorityName => val s = settings.leaseMajoritySettings val leaseOwnerName = cluster.selfUniqueAddress.address.hostPort val lease = LeaseProvider(system).getLease(s"${system.name}-akka-sbr", s.leaseImplementation, leaseOwnerName) - new LeaseMajority(selfDc, s.role, lease, s.acquireLeaseDelayForMinority) + new LeaseMajority(selfDc, s.role, lease, s.acquireLeaseDelayForMinority, cluster.selfUniqueAddress) } Some(SplitBrainResolver.props(settings.DowningStableAfter, strategy)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/DowningWhenOtherHasQuarantinedThisActorSystemSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/DowningWhenOtherHasQuarantinedThisActorSystemSpec.scala new file mode 100644 index 0000000000..9dcffe648a --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/DowningWhenOtherHasQuarantinedThisActorSystemSpec.scala @@ -0,0 +1,136 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster + +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.RootActorPath + +import scala.concurrent.duration._ +import akka.remote.artery.ArterySettings +import akka.remote.artery.ThisActorSystemQuarantinedEvent +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter +import akka.testkit.LongRunningTest +import com.typesafe.config.ConfigFactory + +object DowningWhenOtherHasQuarantinedThisActorSystemSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig( + debugConfig(on = false) + .withFallback(MultiNodeClusterSpec.clusterConfig) + .withFallback( + ConfigFactory.parseString(""" + akka.remote.artery.enabled = on + akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + # speed up decision + akka.cluster.split-brain-resolver.stable-after = 5s + """))) + + // exaggerate the timing issue by ,making the second node decide slower + // this is to more consistently repeat the scenario where the other side completes downing + // while the isolated part still has not made a decision and then see quarantined connections from the other nodes + nodeConfig(second)(ConfigFactory.parseString("akka.cluster.split-brain-resolver.stable-after = 15s")) + + testTransport(on = true) +} + +class DowningWhenOtherHasQuarantinedThisActorSystemMultiJvmNode1 + extends DowningWhenOtherHasQuarantinedThisActorSystemSpec +class DowningWhenOtherHasQuarantinedThisActorSystemMultiJvmNode2 + extends DowningWhenOtherHasQuarantinedThisActorSystemSpec +class DowningWhenOtherHasQuarantinedThisActorSystemMultiJvmNode3 + extends DowningWhenOtherHasQuarantinedThisActorSystemSpec + +abstract class DowningWhenOtherHasQuarantinedThisActorSystemSpec + extends MultiNodeSpec(DowningWhenOtherHasQuarantinedThisActorSystemSpec) + with MultiNodeClusterSpec { + import DowningWhenOtherHasQuarantinedThisActorSystemSpec._ + + "Cluster node downed by other" must { + + if (!ArterySettings(system.settings.config.getConfig("akka.remote.artery")).Enabled) { + // this feature only works in Artery, because classic remoting will not accept connections from + // a quarantined node, and that is too high risk of introducing regressions if changing that + pending + } + + "join cluster" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third) + enterBarrier("after-1") + } + + "down itself" taggedAs LongRunningTest in { + runOn(first) { + testConductor.blackhole(first, second, ThrottlerTransportAdapter.Direction.Both).await + testConductor.blackhole(third, second, ThrottlerTransportAdapter.Direction.Both).await + } + enterBarrier("blackhole") + + within(15.seconds) { + runOn(first) { + awaitAssert { + cluster.state.unreachable.map(_.address) should ===(Set(address(second))) + } + awaitAssert { + // second downed and removed + cluster.state.members.map(_.address) should ===(Set(address(first), address(third))) + } + } + runOn(second) { + awaitAssert { + cluster.state.unreachable.map(_.address) should ===(Set(address(first), address(third))) + } + } + } + enterBarrier("down-second") + + runOn(first) { + testConductor.passThrough(first, second, ThrottlerTransportAdapter.Direction.Both).await + testConductor.passThrough(third, second, ThrottlerTransportAdapter.Direction.Both).await + } + enterBarrier("pass-through") + + runOn(second) { + within(10.seconds) { + awaitAssert { + // try to ping first (Cluster Heartbeat messages will not trigger the Quarantine message) + system.actorSelection(RootActorPath(first) / "user").tell(Identify(None), ActorRef.noSender) + // shutting down itself triggered by ThisActorSystemQuarantinedEvent + cluster.isTerminated should ===(true) + } + } + } + + enterBarrier("after-2") + } + + "not be triggered by another node shutting down" taggedAs LongRunningTest in { + runOn(first) { + system.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent]) + } + enterBarrier("subscribing") + + runOn(third) { + cluster.shutdown() + } + + runOn(first) { + val sel = system.actorSelection(RootActorPath(third) / "user") + (1 to 25).foreach { _ => + sel.tell(Identify(None), ActorRef.noSender) // try to ping third + expectNoMessage(200.millis) // no ThisActorSystemQuarantinedEvent + } + } + + enterBarrier("after-2") + } + + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainQuarantineSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainQuarantineSpec.scala new file mode 100644 index 0000000000..82922951ed --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainQuarantineSpec.scala @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster + +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.RootActorPath +import akka.remote.artery.ArterySettings +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter +import akka.testkit.LongRunningTest +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object SplitBrainQuarantineSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + testTransport(on = true) + commonConfig( + debugConfig(on = true) + .withFallback(MultiNodeClusterSpec.clusterConfig) + .withFallback(ConfigFactory.parseString( + """ + akka.remote.artery.enabled = on + akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + # we dont really want this to hit, but we need the sbr enabled to know the quarantining + # downing does not trigger + akka.cluster.split-brain-resolver.stable-after = 5 minutes + akka.cluster.debug.verbose-gossip-logging = on + """))) +} + +class SplitBrainQuarantineMultiJvmNode1 extends SplitBrainQuarantineSpec +class SplitBrainQuarantineMultiJvmNode2 extends SplitBrainQuarantineSpec +class SplitBrainQuarantineMultiJvmNode3 extends SplitBrainQuarantineSpec +class SplitBrainQuarantineMultiJvmNode4 extends SplitBrainQuarantineSpec + +abstract class SplitBrainQuarantineSpec extends MultiNodeSpec(SplitBrainQuarantineSpec) with MultiNodeClusterSpec { + import SplitBrainQuarantineSpec._ + + // reproduces the scenario where cluster is partitioned and each side (incorrectly) downs the other, + // and after that the partition is resolved and the two split brain halves reconnects + + "Cluster node downed by other" must { + + if (!ArterySettings(system.settings.config.getConfig("akka.remote.artery")).Enabled) { + // this feature only works in Artery, because classic remoting will not accept connections from + // a quarantined node, and that is too high risk of introducing regressions if changing that + pending + } + + "join cluster" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third, fourth) + enterBarrier("after-1") + } + + "split brain" taggedAs LongRunningTest in { + runOn(first) { + testConductor.blackhole(first, third, ThrottlerTransportAdapter.Direction.Both).await + testConductor.blackhole(first, fourth, ThrottlerTransportAdapter.Direction.Both).await + testConductor.blackhole(second, third, ThrottlerTransportAdapter.Direction.Both).await + testConductor.blackhole(second, fourth, ThrottlerTransportAdapter.Direction.Both).await + } + enterBarrier("blackhole") + system.log.info("cluster split into [JVM-1, JVM-2] and [JVM-3, JVM-4] with blackhole") + + within(15.seconds) { + runOn(first) { + cluster.down(third) + cluster.down(fourth) + } + runOn(first, second) { + awaitAssert { + cluster.state.members.collect { case m if m.status == MemberStatus.Up => m.address } should ===( + Set(address(first), address(second))) + cluster.state.members.size should ===(2) + } + system.log.info("JVM-3 and JVM-4 downed from JVM-1") + } + + runOn(third) { + cluster.down(first) + cluster.down(second) + } + runOn(third, fourth) { + awaitAssert { + cluster.state.members.collect { case m if m.status == MemberStatus.Up => m.address } should ===( + Set(address(third), address(fourth))) + cluster.state.members.size should ===(2) + } + system.log.info("JVM-1 and JVM-2 downed from JVM-3") + } + } + enterBarrier("brain-split") + + runOn(first) { + system.log.info("unblackholing cluster") + testConductor.passThrough(first, third, ThrottlerTransportAdapter.Direction.Both).await + testConductor.passThrough(first, fourth, ThrottlerTransportAdapter.Direction.Both).await + testConductor.passThrough(second, third, ThrottlerTransportAdapter.Direction.Both).await + testConductor.passThrough(second, fourth, ThrottlerTransportAdapter.Direction.Both).await + } + enterBarrier("unblackholed") + + // must send some actual messages that would trigger Quarantine to be sure it does in fact not happen + runOn(first, second) { + system.actorSelection(RootActorPath(third) / "user").tell(Identify(None), ActorRef.noSender) + system.actorSelection(RootActorPath(fourth) / "user").tell(Identify(None), ActorRef.noSender) + } + runOn(third, fourth) { + system.actorSelection(RootActorPath(first) / "user").tell(Identify(None), ActorRef.noSender) + system.actorSelection(RootActorPath(second) / "user").tell(Identify(None), ActorRef.noSender) + } + Thread.sleep(3000) + runOn(first, second) { + system.actorSelection(RootActorPath(third) / "user").tell(Identify(None), ActorRef.noSender) + system.actorSelection(RootActorPath(fourth) / "user").tell(Identify(None), ActorRef.noSender) + } + runOn(third, fourth) { + system.actorSelection(RootActorPath(first) / "user").tell(Identify(None), ActorRef.noSender) + system.actorSelection(RootActorPath(second) / "user").tell(Identify(None), ActorRef.noSender) + } + Thread.sleep(5000) + enterBarrier("after-pass-through") + + // as the side that would quarantine each node is now not a part of the cluster it is the same as + // a random node connecting and claiming a node is quarantined and therefore it cannot be trusted + // enough to trigger a ThisActorSystemQuarantinedEvent-termination + cluster.isTerminated should ===(false) + enterBarrier("verify-alive") + } + + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala b/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala index 3401d84564..5e0ea51244 100644 --- a/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala @@ -112,6 +112,7 @@ class SplitBrainResolverSpec import TestAddresses._ private val selfDc = TestAddresses.defaultDataCenter + private lazy val selfUniqueAddress = Cluster(system).selfUniqueAddress private val testLeaseSettings = new LeaseSettings("akka-sbr", "test", new TimeoutSettings(1.second, 2.minutes, 3.seconds), ConfigFactory.empty) @@ -180,7 +181,7 @@ class SplitBrainResolverSpec "StaticQuorum" must { class Setup2(size: Int, role: Option[String]) extends StrategySetup { override def createStrategy() = - new StaticQuorum(selfDc, size, role) + new StaticQuorum(selfDc, size, role, selfUniqueAddress) } "down unreachable when enough reachable nodes" in new Setup2(3, None) { @@ -257,7 +258,7 @@ class SplitBrainResolverSpec "KeepMajority" must { class Setup2(role: Option[String]) extends StrategySetup { override def createStrategy() = - new KeepMajority(selfDc, role) + new KeepMajority(selfDc, role, selfUniqueAddress) } "down minority partition: {A, C, E} | {B, D} => {A, C, E}" in new Setup2(role = None) { @@ -593,7 +594,7 @@ class SplitBrainResolverSpec "KeepOldest" must { class Setup2(downIfAlone: Boolean = true, role: Option[String] = None) extends StrategySetup { - override def createStrategy() = new KeepOldest(selfDc, downIfAlone, role) + override def createStrategy() = new KeepOldest(selfDc, downIfAlone, role, selfUniqueAddress) } "keep partition with oldest" in new Setup2 { @@ -803,7 +804,7 @@ class SplitBrainResolverSpec "DownAllNodes" must { class Setup2 extends StrategySetup { - override def createStrategy() = new DownAllNodes(selfDc) + override def createStrategy() = new DownAllNodes(selfDc, selfUniqueAddress) } "down all" in new Setup2 { @@ -826,7 +827,7 @@ class SplitBrainResolverSpec val acquireLeaseDelayForMinority: FiniteDuration = 2.seconds override def createStrategy() = - new LeaseMajority(selfDc, role, testLease, acquireLeaseDelayForMinority) + new LeaseMajority(selfDc, role, testLease, acquireLeaseDelayForMinority, selfUniqueAddress) } "decide AcquireLeaseAndDownUnreachable, and DownReachable as reverse decision" in { @@ -935,11 +936,11 @@ class SplitBrainResolverSpec "Strategy" must { class MajoritySetup(role: Option[String] = None) extends StrategySetup { - override def createStrategy() = new KeepMajority(selfDc, role) + override def createStrategy() = new KeepMajority(selfDc, role, selfUniqueAddress) } class OldestSetup(role: Option[String] = None) extends StrategySetup { - override def createStrategy() = new KeepOldest(selfDc, downIfAlone = true, role) + override def createStrategy() = new KeepOldest(selfDc, downIfAlone = true, role, selfUniqueAddress) } "add and remove members with default Member ordering" in { @@ -1081,24 +1082,29 @@ class SplitBrainResolverSpec role: Option[String], downAllWhenUnstable: FiniteDuration = Duration.Zero, tickInterval: FiniteDuration = Duration.Zero) - extends Setup(stableAfter, new KeepMajority(selfDc, role), selfUniqueAddress, downAllWhenUnstable, tickInterval) + extends Setup( + stableAfter, + new KeepMajority(selfDc, role, selfUniqueAddress), + selfUniqueAddress, + downAllWhenUnstable, + tickInterval) class SetupKeepOldest( stableAfter: FiniteDuration, selfUniqueAddress: UniqueAddress, downIfAlone: Boolean, role: Option[String]) - extends Setup(stableAfter, new KeepOldest(selfDc, downIfAlone, role), selfUniqueAddress) + extends Setup(stableAfter, new KeepOldest(selfDc, downIfAlone, role, selfUniqueAddress), selfUniqueAddress) class SetupStaticQuorum( stableAfter: FiniteDuration, selfUniqueAddress: UniqueAddress, size: Int, role: Option[String]) - extends Setup(stableAfter, new StaticQuorum(selfDc, size, role), selfUniqueAddress) + extends Setup(stableAfter, new StaticQuorum(selfDc, size, role, selfUniqueAddress), selfUniqueAddress) class SetupDownAllNodes(stableAfter: FiniteDuration, selfUniqueAddress: UniqueAddress) - extends Setup(stableAfter, new DownAllNodes(selfDc), selfUniqueAddress) + extends Setup(stableAfter, new DownAllNodes(selfDc, selfUniqueAddress), selfUniqueAddress) class SetupLeaseMajority( stableAfter: FiniteDuration, @@ -1109,7 +1115,7 @@ class SplitBrainResolverSpec tickInterval: FiniteDuration = Duration.Zero) extends Setup( stableAfter, - new LeaseMajority(selfDc, role, testLease, acquireLeaseDelayForMinority = 20.millis), + new LeaseMajority(selfDc, role, testLease, acquireLeaseDelayForMinority = 20.millis, selfUniqueAddress), selfUniqueAddress, downAllWhenUnstable, tickInterval) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 036f3c28f0..59bab80451 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -809,9 +809,9 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr system.deadLetters, settings.Advanced.SystemMessageResendInterval, settings.Advanced.SysMsgBufferSize)) + .viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right) // note that System messages must not be dropped before the SystemMessageDelivery stage .via(outboundTestFlow(outboundContext)) - .viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right) .via(createEncoder(envelopeBufferPool, ControlStreamId)) .toMat(outboundTransportSink(outboundContext, ControlStreamId, envelopeBufferPool))(Keep.both) diff --git a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala index 664c58ba19..1eee63fa89 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala @@ -33,6 +33,8 @@ private[remote] class SharedTestState { private val state = new AtomicReference[TestState](TestState(Map.empty, None)) + def anyBlackholePresent(): Boolean = state.get.blackholes.nonEmpty + def isBlackhole(from: Address, to: Address): Boolean = state.get.blackholes.get(from) match { case Some(destinations) => destinations(to) @@ -161,6 +163,10 @@ private[remote] class InboundTestStage(inboundContext: InboundContext, state: Sh env.association match { case OptionVal.None => // unknown, handshake not completed + if (state.anyBlackholePresent()) + log.debug( + "inbound message [{}] before handshake completed, cannot check if remote is blackholed, letting through", + Logging.messageClassName(env.message)) push(out, env) case OptionVal.Some(association) => if (state.isBlackhole(inboundContext.localAddress.address, association.remoteAddress)) {