Clear system messages sequence number for restarted node, #24847
* Notice that the incarnation has changed in SystemMessageDelivery and then reset the sequence number * Take the incarnation number into account in the ClearSystemMessageDelivery message * Trigger quarantine earlier in ClusterRemoteWatcher if node with same host:port joined * Change quarantine-removed-node-after to 5s, shouldn't be necessary to delay it 30s * test reproducer
This commit is contained in:
parent
85026f5f1d
commit
43dc381d59
6 changed files with 165 additions and 15 deletions
|
|
@ -74,7 +74,7 @@ akka {
|
|||
# Artery only setting
|
||||
# When a node has been gracefully removed, let this time pass (to allow for example
|
||||
# cluster singleton handover to complete) and then quarantine the removed node.
|
||||
quarantine-removed-node-after=30s
|
||||
quarantine-removed-node-after = 5s
|
||||
|
||||
# If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network
|
||||
# split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp'
|
||||
|
|
|
|||
|
|
@ -5,9 +5,11 @@
|
|||
package akka.cluster
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
import akka.actor._
|
||||
import akka.cluster.ClusterEvent.CurrentClusterState
|
||||
import akka.cluster.ClusterEvent.MemberEvent
|
||||
import akka.cluster.ClusterEvent.MemberJoined
|
||||
import akka.cluster.ClusterEvent.MemberUp
|
||||
import akka.cluster.ClusterEvent.MemberRemoved
|
||||
import akka.cluster.ClusterEvent.MemberWeaklyUp
|
||||
|
|
@ -59,6 +61,8 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
|
||||
private final case class DelayedQuarantine(m: Member, previousStatus: MemberStatus) extends NoSerializationVerificationNeeded
|
||||
|
||||
private var pendingDelayedQuarantine: Set[UniqueAddress] = Set.empty
|
||||
|
||||
var clusterNodes: Set[Address] = Set.empty
|
||||
|
||||
override def preStart(): Unit = {
|
||||
|
|
@ -78,6 +82,7 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address }
|
||||
clusterNodes foreach takeOverResponsibility
|
||||
unreachable = unreachable diff clusterNodes
|
||||
case MemberJoined(m) ⇒ memberJoined(m)
|
||||
case MemberUp(m) ⇒ memberUp(m)
|
||||
case MemberWeaklyUp(m) ⇒ memberUp(m)
|
||||
case MemberRemoved(m, previousStatus) ⇒ memberRemoved(m, previousStatus)
|
||||
|
|
@ -85,8 +90,14 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
case DelayedQuarantine(m, previousStatus) ⇒ delayedQuarantine(m, previousStatus)
|
||||
}
|
||||
|
||||
private def memberJoined(m: Member): Unit = {
|
||||
if (m.address != selfAddress)
|
||||
quarantineOldIncarnation(m)
|
||||
}
|
||||
|
||||
def memberUp(m: Member): Unit =
|
||||
if (m.address != selfAddress) {
|
||||
quarantineOldIncarnation(m)
|
||||
clusterNodes += m.address
|
||||
takeOverResponsibility(m.address)
|
||||
unreachable -= m.address
|
||||
|
|
@ -99,8 +110,11 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
if (previousStatus == MemberStatus.Down) {
|
||||
quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]")
|
||||
} else if (arteryEnabled) {
|
||||
// don't quarantine gracefully removed members (leaving) directly,
|
||||
// Don't quarantine gracefully removed members (leaving) directly,
|
||||
// give Cluster Singleton some time to exchange TakeOver/HandOver messages.
|
||||
// If new incarnation of same host:port is seen then the quarantine of previous incarnation
|
||||
// is triggered earlier.
|
||||
pendingDelayedQuarantine += m.uniqueAddress
|
||||
import context.dispatcher
|
||||
context.system.scheduler.scheduleOnce(cluster.settings.QuarantineRemovedNodeAfter, self, DelayedQuarantine(m, previousStatus))
|
||||
}
|
||||
|
|
@ -108,10 +122,24 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
publishAddressTerminated(m.address)
|
||||
}
|
||||
|
||||
def delayedQuarantine(m: Member, previousStatus: MemberStatus): Unit =
|
||||
quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]")
|
||||
def quarantineOldIncarnation(newIncarnation: Member): Unit = {
|
||||
// If new incarnation of same host:port is seen then quarantine previous incarnation
|
||||
if (pendingDelayedQuarantine.nonEmpty)
|
||||
pendingDelayedQuarantine.find(_.address == newIncarnation.address).foreach { oldIncarnation ⇒
|
||||
pendingDelayedQuarantine -= oldIncarnation
|
||||
quarantine(oldIncarnation.address, Some(oldIncarnation.longUid),
|
||||
s"Cluster member removed, new incarnation joined")
|
||||
}
|
||||
}
|
||||
|
||||
override def watchNode(watchee: InternalActorRef) =
|
||||
def delayedQuarantine(m: Member, previousStatus: MemberStatus): Unit = {
|
||||
if (pendingDelayedQuarantine(m.uniqueAddress)) {
|
||||
pendingDelayedQuarantine -= m.uniqueAddress
|
||||
quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]")
|
||||
}
|
||||
}
|
||||
|
||||
override def watchNode(watchee: InternalActorRef): Unit =
|
||||
if (!clusterNodes(watchee.path.address)) super.watchNode(watchee)
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.ActorIdentity
|
||||
import akka.actor.Identify
|
||||
import akka.actor.PoisonPill
|
||||
import akka.remote.artery.ArteryMultiNodeSpec
|
||||
import akka.testkit.ImplicitSender
|
||||
import akka.testkit.TestActors
|
||||
|
||||
/**
|
||||
* Reproducer for issue #24847
|
||||
*/
|
||||
class ResetSystemMessageSeqNrSpec extends ArteryMultiNodeSpec("""
|
||||
akka.loglevel = INFO
|
||||
akka.actor.provider=cluster
|
||||
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
|
||||
""") with ImplicitSender {
|
||||
|
||||
"System messages sequence numbers" should {
|
||||
|
||||
"be reset when connecting to new incarnation" in {
|
||||
|
||||
val sys2 = newRemoteSystem(name = Some(system.name))
|
||||
|
||||
Cluster(system).join(Cluster(system).selfAddress)
|
||||
Cluster(sys2).join(Cluster(system).selfAddress)
|
||||
within(10.seconds) {
|
||||
awaitAssert {
|
||||
Cluster(system).state.members.map(_.uniqueAddress) should ===(Set(
|
||||
Cluster(system).selfUniqueAddress, Cluster(sys2).selfUniqueAddress))
|
||||
}
|
||||
}
|
||||
|
||||
sys2.actorOf(TestActors.echoActorProps, name = "echo1")
|
||||
system.actorSelection(rootActorPath(sys2) / "user" / "echo1") ! Identify("1")
|
||||
val echo1 = expectMsgType[ActorIdentity].ref.get
|
||||
watch(echo1)
|
||||
|
||||
sys2.actorOf(TestActors.echoActorProps, name = "echo2")
|
||||
system.actorSelection(rootActorPath(sys2) / "user" / "echo2") ! Identify("2")
|
||||
val echo2 = expectMsgType[ActorIdentity].ref.get
|
||||
watch(echo2)
|
||||
echo2 ! PoisonPill
|
||||
expectTerminated(echo2) // now we know that the watch of echo1 has been established
|
||||
|
||||
Cluster(sys2).leave(Cluster(sys2).selfAddress)
|
||||
within(10.seconds) {
|
||||
awaitAssert {
|
||||
Cluster(system).state.members.map(_.uniqueAddress) should not contain Cluster(sys2).selfUniqueAddress
|
||||
}
|
||||
}
|
||||
|
||||
expectTerminated(echo1)
|
||||
shutdown(sys2)
|
||||
|
||||
val sys3 = newRemoteSystem(
|
||||
name = Some(system.name),
|
||||
extraConfig = Some(s"akka.remote.artery.canonical.port=${Cluster(sys2).selfAddress.port.get}"))
|
||||
Cluster(sys3).join(Cluster(system).selfAddress)
|
||||
within(10.seconds) {
|
||||
awaitAssert {
|
||||
Cluster(system).state.members.map(_.uniqueAddress) should ===(Set(
|
||||
Cluster(system).selfUniqueAddress, Cluster(sys3).selfUniqueAddress))
|
||||
}
|
||||
}
|
||||
|
||||
sys3.actorOf(TestActors.echoActorProps, name = "echo3")
|
||||
system.actorSelection(rootActorPath(sys3) / "user" / "echo3") ! Identify("3")
|
||||
val echo3 = expectMsgType[ActorIdentity].ref.get
|
||||
watch(echo3)
|
||||
|
||||
// To clearly see the reproducer for issue #24847 one could put a sleep here and observe the
|
||||
// "negative acknowledgment" log messages, but it also failed on the next expectTerminated because
|
||||
// the Watch message was never delivered.
|
||||
|
||||
echo3 ! PoisonPill
|
||||
expectTerminated(echo3)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -1,3 +1,6 @@
|
|||
# #24553 eliminate test stages when not used
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.OutboundTestStage.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.InboundTestStage.this")
|
||||
|
||||
# #24847 ClearSystemMessageDelivery
|
||||
ProblemFilters.exclude[Problem]("akka.remote.artery.SystemMessageDelivery*")
|
||||
|
|
|
|||
|
|
@ -345,10 +345,11 @@ private[remote] class Association(
|
|||
|
||||
val state = associationState
|
||||
val quarantined = state.isQuarantined()
|
||||
val messageIsClearSystemMessageDelivery = message.isInstanceOf[ClearSystemMessageDelivery]
|
||||
|
||||
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
|
||||
if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || message == ClearSystemMessageDelivery) {
|
||||
if (quarantined && message != ClearSystemMessageDelivery) {
|
||||
if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || messageIsClearSystemMessageDelivery) {
|
||||
if (quarantined && !messageIsClearSystemMessageDelivery) {
|
||||
log.debug("Quarantine piercing attempt with message [{}] to [{}]", Logging.messageClassName(message), recipient.getOrElse(""))
|
||||
startQuarantinedIdleTimer()
|
||||
}
|
||||
|
|
@ -360,7 +361,7 @@ private[remote] class Association(
|
|||
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
|
||||
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
|
||||
}
|
||||
case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | ClearSystemMessageDelivery ⇒
|
||||
case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | _: ClearSystemMessageDelivery ⇒
|
||||
// ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating
|
||||
if (!controlQueue.offer(outboundEnvelope)) {
|
||||
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
|
||||
|
|
@ -476,7 +477,7 @@ private[remote] class Association(
|
|||
clearOutboundCompression()
|
||||
clearInboundCompression(u)
|
||||
// end delivery of system messages to that incarnation after this point
|
||||
send(ClearSystemMessageDelivery, OptionVal.None, OptionVal.None)
|
||||
send(ClearSystemMessageDelivery(current.incarnation), OptionVal.None, OptionVal.None)
|
||||
// try to tell the other system that we have quarantined it
|
||||
sendControl(Quarantined(localAddress, peer))
|
||||
startQuarantinedIdleTimer()
|
||||
|
|
@ -484,11 +485,12 @@ private[remote] class Association(
|
|||
quarantine(reason, uid) // recursive
|
||||
}
|
||||
case Some(peer) ⇒
|
||||
log.debug(
|
||||
log.info(
|
||||
"Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}",
|
||||
remoteAddress, u, peer.uid, reason)
|
||||
send(ClearSystemMessageDelivery(current.incarnation - 1), OptionVal.None, OptionVal.None)
|
||||
case None ⇒
|
||||
log.debug(
|
||||
log.info(
|
||||
"Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}",
|
||||
remoteAddress, reason)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,7 +42,19 @@ private[remote] object SystemMessageDelivery {
|
|||
final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply
|
||||
final case class Nack(seqNo: Long, from: UniqueAddress) extends Reply
|
||||
|
||||
final case object ClearSystemMessageDelivery
|
||||
/**
|
||||
* Sent when an incarnation of an Association is quarantined. Consumed by the
|
||||
* SystemMessageDelivery stage on the sending side, i.e. not sent to remote system.
|
||||
* The SystemMessageDelivery stage will clear the sequence number and other state associated
|
||||
* with that incarnation.
|
||||
*
|
||||
* The incarnation counter is bumped when the handshake is completed, so a new incarnation
|
||||
* corresponds to a new UID of the remote system.
|
||||
*
|
||||
* The SystemMessageDelivery stage also detects that the incarnation has changed when sending or resending
|
||||
* system messages.
|
||||
*/
|
||||
final case class ClearSystemMessageDelivery(incarnation: Int)
|
||||
|
||||
final class GaveUpSystemMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace
|
||||
|
||||
|
|
@ -75,6 +87,7 @@ private[remote] class SystemMessageDelivery(
|
|||
|
||||
private var replyObserverAttached = false
|
||||
private var seqNo = 0L // sequence number for the first message will be 1
|
||||
private var incarnation = outboundContext.associationState.incarnation
|
||||
private val unacknowledged = new ArrayDeque[OutboundEnvelope]
|
||||
private var resending = new ArrayDeque[OutboundEnvelope]
|
||||
private var resendingFromSeqNo = -1L
|
||||
|
|
@ -85,6 +98,8 @@ private[remote] class SystemMessageDelivery(
|
|||
|
||||
private def localAddress = outboundContext.localAddress
|
||||
private def remoteAddress = outboundContext.remoteAddress
|
||||
private def remoteAddressLogParam: String =
|
||||
outboundContext.associationState.uniqueRemoteAddressValue().getOrElse(remoteAddress).toString
|
||||
|
||||
override protected def logSource: Class[_] = classOf[SystemMessageDelivery]
|
||||
|
||||
|
|
@ -192,6 +207,11 @@ private[remote] class SystemMessageDelivery(
|
|||
}
|
||||
}
|
||||
|
||||
if (incarnation != outboundContext.associationState.incarnation) {
|
||||
log.debug("Noticed new incarnation of [{}] from tryResend, clear state", remoteAddressLogParam)
|
||||
clear()
|
||||
}
|
||||
|
||||
pushCopy(env)
|
||||
}
|
||||
}
|
||||
|
|
@ -207,6 +227,12 @@ private[remote] class SystemMessageDelivery(
|
|||
outboundEnvelope.message match {
|
||||
case msg @ (_: SystemMessage | _: AckedDeliveryMessage) ⇒
|
||||
if (unacknowledged.size < maxBufferSize) {
|
||||
if (seqNo == 0) {
|
||||
incarnation = outboundContext.associationState.incarnation
|
||||
} else if (incarnation != outboundContext.associationState.incarnation) {
|
||||
log.debug("Noticed new incarnation of [{}] from onPush, clear state", remoteAddressLogParam)
|
||||
clear()
|
||||
}
|
||||
seqNo += 1
|
||||
if (unacknowledged.isEmpty)
|
||||
ackTimestamp = System.nanoTime()
|
||||
|
|
@ -231,8 +257,11 @@ private[remote] class SystemMessageDelivery(
|
|||
// pass on HandshakeReq
|
||||
if (isAvailable(out))
|
||||
pushCopy(outboundEnvelope)
|
||||
case ClearSystemMessageDelivery ⇒
|
||||
case ClearSystemMessageDelivery(i) ⇒
|
||||
if (i <= incarnation) {
|
||||
log.debug("Clear system message delivery of [{}]", remoteAddressLogParam)
|
||||
clear()
|
||||
}
|
||||
pull(in)
|
||||
case _ ⇒
|
||||
// e.g. ActorSystemTerminating or ActorSelectionMessage with PriorityMessage, no need for acked delivery
|
||||
|
|
@ -255,6 +284,7 @@ private[remote] class SystemMessageDelivery(
|
|||
private def clear(): Unit = {
|
||||
sendUnacknowledgedToDeadLetters()
|
||||
seqNo = 0L // sequence number for the first message will be 1
|
||||
incarnation = outboundContext.associationState.incarnation
|
||||
unacknowledged.clear()
|
||||
resending.clear()
|
||||
resendingFromSeqNo = -1L
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue