Fix ResendUnfulfillableException after transport failure detection, #23010
Reproducer (TransportFailSpec):
* watch from first to second node, i.e. sys msg with seq number 1
* trigger transport failure detection to tear down the connection
* the bug was that on the second node the ReliableDeliverySupervisor
was stopped because the send buffer had not been used on that side,
but that removed the receive buffer entry
* later, after gating elapsed another watch from first to second node,
i.e. sys msg with seq number 2
* when that watch msg was received on the second node the receive buffer
had been cleared and therefore it thought that seq number 1 was missing,
and therefore sent nack to the first node
* when first node received the nack it thrown
IllegalStateException: Error encountered while processing system message
acknowledgement buffer: [2 {2}] ack: ACK[2, {1, 0}]
caused by: ResendUnfulfillableException: Unable to fulfill resend request since
negatively acknowledged payload is no longer in buffer
This was fixed by not stopping the ReliableDeliverySupervisor so that the
receive buffer was preserved.
Not necessary for fixing the issue, but the following config settings were adjusted:
* increased transport-failure-detector timeout to avoid tearing down the
connection too early
* reduce the quarantine-after-silence to cleanup ReliableDeliverySupervisor
actors earlier
This commit is contained in:
parent
d5155755db
commit
32f0936d17
4 changed files with 177 additions and 17 deletions
|
|
@ -0,0 +1,166 @@
|
|||
/**
|
||||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorIdentity
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Identify
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Props
|
||||
import akka.event.EventStream
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.testkit._
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object TransportFailConfig extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(
|
||||
ConfigFactory.parseString(s"""
|
||||
akka.loglevel = INFO
|
||||
akka.remote {
|
||||
transport-failure-detector {
|
||||
implementation-class = "akka.remote.TransportFailSpec$$TestFailureDetector"
|
||||
heartbeat-interval = 1 s
|
||||
}
|
||||
retry-gate-closed-for = 3 s
|
||||
# Don't trigger watch Terminated
|
||||
watch-failure-detector.acceptable-heartbeat-pause = 60 s
|
||||
#use-passive-connections = off
|
||||
# This test is not interesting for Artery, no transport failure detector
|
||||
# but it will not fail when running with Artery enabled
|
||||
artery.enabled = off
|
||||
}
|
||||
""")))
|
||||
|
||||
}
|
||||
|
||||
class TransportFailMultiJvmNode1 extends TransportFailSpec
|
||||
class TransportFailMultiJvmNode2 extends TransportFailSpec
|
||||
|
||||
object TransportFailSpec {
|
||||
class Subject extends Actor {
|
||||
def receive = {
|
||||
case msg ⇒ sender() ! msg
|
||||
}
|
||||
}
|
||||
|
||||
private val fdAvailable = new AtomicBoolean(true)
|
||||
|
||||
// FD that will fail when `fdAvailable` flag is false
|
||||
class TestFailureDetector(config: Config, ev: EventStream) extends FailureDetector {
|
||||
@volatile private var active = false
|
||||
|
||||
override def heartbeat(): Unit = {
|
||||
active = true
|
||||
}
|
||||
|
||||
override def isAvailable: Boolean = {
|
||||
if (active) fdAvailable.get
|
||||
else true
|
||||
}
|
||||
|
||||
override def isMonitoring: Boolean = active
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This was a reproducer for issue #23010.
|
||||
* - watch from first to second node, i.e. sys msg with seq number 1
|
||||
* - trigger transport failure detection to tear down the connection
|
||||
* - the bug was that on the second node the ReliableDeliverySupervisor
|
||||
* was stopped because the send buffer had not been used on that side,
|
||||
* but that removed the receive buffer entry
|
||||
* - later, after gating elapsed another watch from first to second node,
|
||||
* i.e. sys msg with seq number 2
|
||||
* - when that watch msg was received on the second node the receive buffer
|
||||
* had been cleared and therefore it thought that seq number 1 was missing,
|
||||
* and therefore sent nack to the first node
|
||||
* - when first node received the nack it thrown
|
||||
* IllegalStateException: Error encountered while processing system message
|
||||
* acknowledgement buffer: [2 {2}] ack: ACK[2, {1, 0}]
|
||||
* caused by: ResendUnfulfillableException: Unable to fulfill resend request since
|
||||
* negatively acknowledged payload is no longer in buffer
|
||||
*
|
||||
* This was fixed by not stopping the ReliableDeliverySupervisor so that the
|
||||
* receive buffer was preserved.
|
||||
*/
|
||||
abstract class TransportFailSpec extends RemotingMultiNodeSpec(TransportFailConfig) {
|
||||
import TransportFailConfig._
|
||||
import TransportFailSpec._
|
||||
|
||||
override def initialParticipants = roles.size
|
||||
|
||||
def identify(role: RoleName, actorName: String): ActorRef = {
|
||||
system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName)
|
||||
expectMsgType[ActorIdentity].ref.get
|
||||
}
|
||||
|
||||
"TransportFail" must {
|
||||
|
||||
"reconnect" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
val secondAddress = node(second).address
|
||||
enterBarrier("actors-started")
|
||||
|
||||
val subject = identify(second, "subject")
|
||||
watch(subject)
|
||||
subject ! "hello"
|
||||
expectMsg("hello")
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
system.actorOf(Props[Subject], "subject")
|
||||
enterBarrier("actors-started")
|
||||
}
|
||||
|
||||
enterBarrier("watch-established")
|
||||
// trigger transport FD
|
||||
TransportFailSpec.fdAvailable.set(false)
|
||||
|
||||
// wait for ungated (also later awaitAssert retry)
|
||||
Thread.sleep(RARP(system).provider.remoteSettings.RetryGateClosedFor.toMillis)
|
||||
TransportFailSpec.fdAvailable.set(true)
|
||||
|
||||
runOn(first) {
|
||||
enterBarrier("actors-started2")
|
||||
val quarantineProbe = TestProbe()
|
||||
system.eventStream.subscribe(quarantineProbe.ref, classOf[QuarantinedEvent])
|
||||
|
||||
var subject2: ActorRef = null
|
||||
awaitAssert({
|
||||
within(1.second) {
|
||||
subject2 = identify(second, "subject2")
|
||||
}
|
||||
}, max = 5.seconds)
|
||||
watch(subject2)
|
||||
quarantineProbe.expectNoMsg(1.seconds)
|
||||
subject2 ! "hello2"
|
||||
expectMsg("hello2")
|
||||
enterBarrier("watch-established2")
|
||||
expectTerminated(subject2)
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
val subject2 = system.actorOf(Props[Subject], "subject2")
|
||||
enterBarrier("actors-started2")
|
||||
enterBarrier("watch-established2")
|
||||
subject2 ! PoisonPill
|
||||
}
|
||||
|
||||
enterBarrier("done")
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -171,7 +171,7 @@ akka {
|
|||
# For TCP it is not important to have fast failure detection, since
|
||||
# most connection failures are captured by TCP itself.
|
||||
# The default DeadlineFailureDetector will trigger if there are no heartbeats within
|
||||
# the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 20 seconds
|
||||
# the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 124 seconds
|
||||
# with the default settings.
|
||||
transport-failure-detector {
|
||||
|
||||
|
|
@ -189,7 +189,7 @@ akka {
|
|||
# A margin to the `heartbeat-interval` is important to be able to survive sudden,
|
||||
# occasional, pauses in heartbeat arrivals, due to for example garbage collect or
|
||||
# network drop.
|
||||
acceptable-heartbeat-pause = 16 s
|
||||
acceptable-heartbeat-pause = 120 s
|
||||
}
|
||||
|
||||
# Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf
|
||||
|
|
@ -381,7 +381,7 @@ akka {
|
|||
# - there are no pending system messages to deliver
|
||||
# for the amount of time configured here, the remote system will be quarantined and all state
|
||||
# associated with it will be dropped.
|
||||
quarantine-after-silence = 5 d
|
||||
quarantine-after-silence = 2 d
|
||||
|
||||
# This setting defines the maximum number of unacknowledged system messages
|
||||
# allowed for a remote system. If this limit is reached the remote system is
|
||||
|
|
|
|||
|
|
@ -215,8 +215,6 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
val autoResendTimer = context.system.scheduler.schedule(
|
||||
settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
|
||||
|
||||
private var bufferWasInUse = false
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
|
||||
case e @ (_: AssociationProblem) ⇒ Escalate
|
||||
case NonFatal(e) ⇒
|
||||
|
|
@ -225,14 +223,12 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
"Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage, causedBy)
|
||||
uidConfirmed = false // Need confirmation of UID again
|
||||
if (bufferWasInUse) {
|
||||
if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty)
|
||||
bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout)
|
||||
context.become(gated(writerTerminated = false, earlyUngateRequested = false))
|
||||
currentHandle = None
|
||||
context.parent ! StoppedReading(self)
|
||||
Stop
|
||||
} else Escalate
|
||||
if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty)
|
||||
bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout)
|
||||
context.become(gated(writerTerminated = false, earlyUngateRequested = false))
|
||||
currentHandle = None
|
||||
context.parent ! StoppedReading(self)
|
||||
Stop
|
||||
}
|
||||
|
||||
var currentHandle: Option[AkkaProtocolHandle] = handleOrActive
|
||||
|
|
@ -242,7 +238,6 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
|
||||
def reset(): Unit = {
|
||||
resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize)
|
||||
bufferWasInUse = false
|
||||
seqCounter = 0L
|
||||
bailoutAt = None
|
||||
}
|
||||
|
|
@ -385,7 +380,7 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
}
|
||||
|
||||
private def goToIdle(): Unit = {
|
||||
if (bufferWasInUse && maxSilenceTimer.isEmpty)
|
||||
if (maxSilenceTimer.isEmpty)
|
||||
maxSilenceTimer = Some(context.system.scheduler.scheduleOnce(settings.QuarantineSilentSystemTimeout, self, TooLongIdle))
|
||||
context.become(idle)
|
||||
}
|
||||
|
|
@ -427,7 +422,6 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
private def tryBuffer(s: Send): Unit =
|
||||
try {
|
||||
resendBuffer = resendBuffer buffer s
|
||||
bufferWasInUse = true
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ throw new HopelessAssociation(localAddress, remoteAddress, uid, e)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ class RemoteConfigSpec extends AkkaSpec(
|
|||
|
||||
TransportFailureDetectorImplementationClass should ===(classOf[DeadlineFailureDetector].getName)
|
||||
TransportHeartBeatInterval should ===(4.seconds)
|
||||
TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should ===(16.seconds)
|
||||
TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should ===(120.seconds)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue