diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 2ec4c29119..914713edc9 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -74,7 +74,7 @@ akka { # Artery only setting # When a node has been gracefully removed, let this time pass (to allow for example # cluster singleton handover to complete) and then quarantine the removed node. - quarantine-removed-node-after=30s + quarantine-removed-node-after = 5s # If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network # split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp' diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 7f22d81a1a..806933cd44 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -5,9 +5,11 @@ package akka.cluster import scala.concurrent.duration.FiniteDuration + import akka.actor._ import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.ClusterEvent.MemberJoined import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ClusterEvent.MemberWeaklyUp @@ -59,6 +61,8 @@ private[cluster] class ClusterRemoteWatcher( private final case class DelayedQuarantine(m: Member, previousStatus: MemberStatus) extends NoSerializationVerificationNeeded + private var pendingDelayedQuarantine: Set[UniqueAddress] = Set.empty + var clusterNodes: Set[Address] = Set.empty override def preStart(): Unit = { @@ -78,6 +82,7 @@ private[cluster] class ClusterRemoteWatcher( clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address } clusterNodes foreach takeOverResponsibility unreachable = unreachable diff clusterNodes + case MemberJoined(m) ⇒ memberJoined(m) case MemberUp(m) ⇒ memberUp(m) case MemberWeaklyUp(m) ⇒ memberUp(m) case MemberRemoved(m, previousStatus) ⇒ memberRemoved(m, previousStatus) @@ -85,8 +90,14 @@ private[cluster] class ClusterRemoteWatcher( case DelayedQuarantine(m, previousStatus) ⇒ delayedQuarantine(m, previousStatus) } + private def memberJoined(m: Member): Unit = { + if (m.address != selfAddress) + quarantineOldIncarnation(m) + } + def memberUp(m: Member): Unit = if (m.address != selfAddress) { + quarantineOldIncarnation(m) clusterNodes += m.address takeOverResponsibility(m.address) unreachable -= m.address @@ -99,8 +110,11 @@ private[cluster] class ClusterRemoteWatcher( if (previousStatus == MemberStatus.Down) { quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]") } else if (arteryEnabled) { - // don't quarantine gracefully removed members (leaving) directly, + // Don't quarantine gracefully removed members (leaving) directly, // give Cluster Singleton some time to exchange TakeOver/HandOver messages. + // If new incarnation of same host:port is seen then the quarantine of previous incarnation + // is triggered earlier. + pendingDelayedQuarantine += m.uniqueAddress import context.dispatcher context.system.scheduler.scheduleOnce(cluster.settings.QuarantineRemovedNodeAfter, self, DelayedQuarantine(m, previousStatus)) } @@ -108,10 +122,24 @@ private[cluster] class ClusterRemoteWatcher( publishAddressTerminated(m.address) } - def delayedQuarantine(m: Member, previousStatus: MemberStatus): Unit = - quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]") + def quarantineOldIncarnation(newIncarnation: Member): Unit = { + // If new incarnation of same host:port is seen then quarantine previous incarnation + if (pendingDelayedQuarantine.nonEmpty) + pendingDelayedQuarantine.find(_.address == newIncarnation.address).foreach { oldIncarnation ⇒ + pendingDelayedQuarantine -= oldIncarnation + quarantine(oldIncarnation.address, Some(oldIncarnation.longUid), + s"Cluster member removed, new incarnation joined") + } + } - override def watchNode(watchee: InternalActorRef) = + def delayedQuarantine(m: Member, previousStatus: MemberStatus): Unit = { + if (pendingDelayedQuarantine(m.uniqueAddress)) { + pendingDelayedQuarantine -= m.uniqueAddress + quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]") + } + } + + override def watchNode(watchee: InternalActorRef): Unit = if (!clusterNodes(watchee.path.address)) super.watchNode(watchee) /** diff --git a/akka-cluster/src/test/scala/akka/cluster/ResetSystemMessageSeqNrSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ResetSystemMessageSeqNrSpec.scala new file mode 100644 index 0000000000..3060c2597b --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ResetSystemMessageSeqNrSpec.scala @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster + +import scala.concurrent.duration._ + +import akka.actor.ActorIdentity +import akka.actor.Identify +import akka.actor.PoisonPill +import akka.remote.artery.ArteryMultiNodeSpec +import akka.testkit.ImplicitSender +import akka.testkit.TestActors + +/** + * Reproducer for issue #24847 + */ +class ResetSystemMessageSeqNrSpec extends ArteryMultiNodeSpec(""" + akka.loglevel = INFO + akka.actor.provider=cluster + akka.cluster.jmx.multi-mbeans-in-same-jvm = on + """) with ImplicitSender { + + "System messages sequence numbers" should { + + "be reset when connecting to new incarnation" in { + + val sys2 = newRemoteSystem(name = Some(system.name)) + + Cluster(system).join(Cluster(system).selfAddress) + Cluster(sys2).join(Cluster(system).selfAddress) + within(10.seconds) { + awaitAssert { + Cluster(system).state.members.map(_.uniqueAddress) should ===(Set( + Cluster(system).selfUniqueAddress, Cluster(sys2).selfUniqueAddress)) + } + } + + sys2.actorOf(TestActors.echoActorProps, name = "echo1") + system.actorSelection(rootActorPath(sys2) / "user" / "echo1") ! Identify("1") + val echo1 = expectMsgType[ActorIdentity].ref.get + watch(echo1) + + sys2.actorOf(TestActors.echoActorProps, name = "echo2") + system.actorSelection(rootActorPath(sys2) / "user" / "echo2") ! Identify("2") + val echo2 = expectMsgType[ActorIdentity].ref.get + watch(echo2) + echo2 ! PoisonPill + expectTerminated(echo2) // now we know that the watch of echo1 has been established + + Cluster(sys2).leave(Cluster(sys2).selfAddress) + within(10.seconds) { + awaitAssert { + Cluster(system).state.members.map(_.uniqueAddress) should not contain Cluster(sys2).selfUniqueAddress + } + } + + expectTerminated(echo1) + shutdown(sys2) + + val sys3 = newRemoteSystem( + name = Some(system.name), + extraConfig = Some(s"akka.remote.artery.canonical.port=${Cluster(sys2).selfAddress.port.get}")) + Cluster(sys3).join(Cluster(system).selfAddress) + within(10.seconds) { + awaitAssert { + Cluster(system).state.members.map(_.uniqueAddress) should ===(Set( + Cluster(system).selfUniqueAddress, Cluster(sys3).selfUniqueAddress)) + } + } + + sys3.actorOf(TestActors.echoActorProps, name = "echo3") + system.actorSelection(rootActorPath(sys3) / "user" / "echo3") ! Identify("3") + val echo3 = expectMsgType[ActorIdentity].ref.get + watch(echo3) + + // To clearly see the reproducer for issue #24847 one could put a sleep here and observe the + // "negative acknowledgment" log messages, but it also failed on the next expectTerminated because + // the Watch message was never delivered. + + echo3 ! PoisonPill + expectTerminated(echo3) + } + + } +} diff --git a/akka-remote/src/main/mima-filters/2.5.11.backwards.excludes b/akka-remote/src/main/mima-filters/2.5.11.backwards.excludes index 1f9cf3ab4b..9e07caf4c2 100644 --- a/akka-remote/src/main/mima-filters/2.5.11.backwards.excludes +++ b/akka-remote/src/main/mima-filters/2.5.11.backwards.excludes @@ -1,3 +1,6 @@ # #24553 eliminate test stages when not used ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.OutboundTestStage.this") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.InboundTestStage.this") \ No newline at end of file +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.InboundTestStage.this") + +# #24847 ClearSystemMessageDelivery +ProblemFilters.exclude[Problem]("akka.remote.artery.SystemMessageDelivery*") diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 2242450e8f..d9fd5a0786 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -345,10 +345,11 @@ private[remote] class Association( val state = associationState val quarantined = state.isQuarantined() + val messageIsClearSystemMessageDelivery = message.isInstanceOf[ClearSystemMessageDelivery] // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system - if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || message == ClearSystemMessageDelivery) { - if (quarantined && message != ClearSystemMessageDelivery) { + if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || messageIsClearSystemMessageDelivery) { + if (quarantined && !messageIsClearSystemMessageDelivery) { log.debug("Quarantine piercing attempt with message [{}] to [{}]", Logging.messageClassName(message), recipient.getOrElse("")) startQuarantinedIdleTimer() } @@ -360,7 +361,7 @@ private[remote] class Association( quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope) } - case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | ClearSystemMessageDelivery ⇒ + case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | _: ClearSystemMessageDelivery ⇒ // ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating if (!controlQueue.offer(outboundEnvelope)) { dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope) @@ -476,7 +477,7 @@ private[remote] class Association( clearOutboundCompression() clearInboundCompression(u) // end delivery of system messages to that incarnation after this point - send(ClearSystemMessageDelivery, OptionVal.None, OptionVal.None) + send(ClearSystemMessageDelivery(current.incarnation), OptionVal.None, OptionVal.None) // try to tell the other system that we have quarantined it sendControl(Quarantined(localAddress, peer)) startQuarantinedIdleTimer() @@ -484,11 +485,12 @@ private[remote] class Association( quarantine(reason, uid) // recursive } case Some(peer) ⇒ - log.debug( + log.info( "Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}", remoteAddress, u, peer.uid, reason) + send(ClearSystemMessageDelivery(current.incarnation - 1), OptionVal.None, OptionVal.None) case None ⇒ - log.debug( + log.info( "Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}", remoteAddress, reason) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index e4abbb3180..a56eaa590c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -42,7 +42,19 @@ private[remote] object SystemMessageDelivery { final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply final case class Nack(seqNo: Long, from: UniqueAddress) extends Reply - final case object ClearSystemMessageDelivery + /** + * Sent when an incarnation of an Association is quarantined. Consumed by the + * SystemMessageDelivery stage on the sending side, i.e. not sent to remote system. + * The SystemMessageDelivery stage will clear the sequence number and other state associated + * with that incarnation. + * + * The incarnation counter is bumped when the handshake is completed, so a new incarnation + * corresponds to a new UID of the remote system. + * + * The SystemMessageDelivery stage also detects that the incarnation has changed when sending or resending + * system messages. + */ + final case class ClearSystemMessageDelivery(incarnation: Int) final class GaveUpSystemMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace @@ -75,6 +87,7 @@ private[remote] class SystemMessageDelivery( private var replyObserverAttached = false private var seqNo = 0L // sequence number for the first message will be 1 + private var incarnation = outboundContext.associationState.incarnation private val unacknowledged = new ArrayDeque[OutboundEnvelope] private var resending = new ArrayDeque[OutboundEnvelope] private var resendingFromSeqNo = -1L @@ -85,6 +98,8 @@ private[remote] class SystemMessageDelivery( private def localAddress = outboundContext.localAddress private def remoteAddress = outboundContext.remoteAddress + private def remoteAddressLogParam: String = + outboundContext.associationState.uniqueRemoteAddressValue().getOrElse(remoteAddress).toString override protected def logSource: Class[_] = classOf[SystemMessageDelivery] @@ -192,6 +207,11 @@ private[remote] class SystemMessageDelivery( } } + if (incarnation != outboundContext.associationState.incarnation) { + log.debug("Noticed new incarnation of [{}] from tryResend, clear state", remoteAddressLogParam) + clear() + } + pushCopy(env) } } @@ -207,6 +227,12 @@ private[remote] class SystemMessageDelivery( outboundEnvelope.message match { case msg @ (_: SystemMessage | _: AckedDeliveryMessage) ⇒ if (unacknowledged.size < maxBufferSize) { + if (seqNo == 0) { + incarnation = outboundContext.associationState.incarnation + } else if (incarnation != outboundContext.associationState.incarnation) { + log.debug("Noticed new incarnation of [{}] from onPush, clear state", remoteAddressLogParam) + clear() + } seqNo += 1 if (unacknowledged.isEmpty) ackTimestamp = System.nanoTime() @@ -231,8 +257,11 @@ private[remote] class SystemMessageDelivery( // pass on HandshakeReq if (isAvailable(out)) pushCopy(outboundEnvelope) - case ClearSystemMessageDelivery ⇒ - clear() + case ClearSystemMessageDelivery(i) ⇒ + if (i <= incarnation) { + log.debug("Clear system message delivery of [{}]", remoteAddressLogParam) + clear() + } pull(in) case _ ⇒ // e.g. ActorSystemTerminating or ActorSelectionMessage with PriorityMessage, no need for acked delivery @@ -255,6 +284,7 @@ private[remote] class SystemMessageDelivery( private def clear(): Unit = { sendUnacknowledgedToDeadLetters() seqNo = 0L // sequence number for the first message will be 1 + incarnation = outboundContext.associationState.incarnation unacknowledged.clear() resending.clear() resendingFromSeqNo = -1L