From 32f0936d1730f09ac3201dec29c357cb0549d9cb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 9 Jun 2017 08:10:44 +0200 Subject: [PATCH] Fix ResendUnfulfillableException after transport failure detection, #23010 Reproducer (TransportFailSpec): * watch from first to second node, i.e. sys msg with seq number 1 * trigger transport failure detection to tear down the connection * the bug was that on the second node the ReliableDeliverySupervisor was stopped because the send buffer had not been used on that side, but that removed the receive buffer entry * later, after gating elapsed another watch from first to second node, i.e. sys msg with seq number 2 * when that watch msg was received on the second node the receive buffer had been cleared and therefore it thought that seq number 1 was missing, and therefore sent nack to the first node * when first node received the nack it thrown IllegalStateException: Error encountered while processing system message acknowledgement buffer: [2 {2}] ack: ACK[2, {1, 0}] caused by: ResendUnfulfillableException: Unable to fulfill resend request since negatively acknowledged payload is no longer in buffer This was fixed by not stopping the ReliableDeliverySupervisor so that the receive buffer was preserved. Not necessary for fixing the issue, but the following config settings were adjusted: * increased transport-failure-detector timeout to avoid tearing down the connection too early * reduce the quarantine-after-silence to cleanup ReliableDeliverySupervisor actors earlier --- .../scala/akka/remote/TransportFailSpec.scala | 166 ++++++++++++++++++ akka-remote/src/main/resources/reference.conf | 6 +- .../src/main/scala/akka/remote/Endpoint.scala | 20 +-- .../scala/akka/remote/RemoteConfigSpec.scala | 2 +- 4 files changed, 177 insertions(+), 17 deletions(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/TransportFailSpec.scala diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/TransportFailSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/TransportFailSpec.scala new file mode 100644 index 0000000000..883fc243c0 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/TransportFailSpec.scala @@ -0,0 +1,166 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.remote + +import java.util.concurrent.atomic.AtomicBoolean + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.PoisonPill +import akka.actor.Props +import akka.event.EventStream +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.testkit._ +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +object TransportFailConfig extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.remote { + transport-failure-detector { + implementation-class = "akka.remote.TransportFailSpec$$TestFailureDetector" + heartbeat-interval = 1 s + } + retry-gate-closed-for = 3 s + # Don't trigger watch Terminated + watch-failure-detector.acceptable-heartbeat-pause = 60 s + #use-passive-connections = off + # This test is not interesting for Artery, no transport failure detector + # but it will not fail when running with Artery enabled + artery.enabled = off + } + """))) + +} + +class TransportFailMultiJvmNode1 extends TransportFailSpec +class TransportFailMultiJvmNode2 extends TransportFailSpec + +object TransportFailSpec { + class Subject extends Actor { + def receive = { + case msg ⇒ sender() ! msg + } + } + + private val fdAvailable = new AtomicBoolean(true) + + // FD that will fail when `fdAvailable` flag is false + class TestFailureDetector(config: Config, ev: EventStream) extends FailureDetector { + @volatile private var active = false + + override def heartbeat(): Unit = { + active = true + } + + override def isAvailable: Boolean = { + if (active) fdAvailable.get + else true + } + + override def isMonitoring: Boolean = active + } +} + +/** + * This was a reproducer for issue #23010. + * - watch from first to second node, i.e. sys msg with seq number 1 + * - trigger transport failure detection to tear down the connection + * - the bug was that on the second node the ReliableDeliverySupervisor + * was stopped because the send buffer had not been used on that side, + * but that removed the receive buffer entry + * - later, after gating elapsed another watch from first to second node, + * i.e. sys msg with seq number 2 + * - when that watch msg was received on the second node the receive buffer + * had been cleared and therefore it thought that seq number 1 was missing, + * and therefore sent nack to the first node + * - when first node received the nack it thrown + * IllegalStateException: Error encountered while processing system message + * acknowledgement buffer: [2 {2}] ack: ACK[2, {1, 0}] + * caused by: ResendUnfulfillableException: Unable to fulfill resend request since + * negatively acknowledged payload is no longer in buffer + * + * This was fixed by not stopping the ReliableDeliverySupervisor so that the + * receive buffer was preserved. + */ +abstract class TransportFailSpec extends RemotingMultiNodeSpec(TransportFailConfig) { + import TransportFailConfig._ + import TransportFailSpec._ + + override def initialParticipants = roles.size + + def identify(role: RoleName, actorName: String): ActorRef = { + system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName) + expectMsgType[ActorIdentity].ref.get + } + + "TransportFail" must { + + "reconnect" taggedAs LongRunningTest in { + runOn(first) { + val secondAddress = node(second).address + enterBarrier("actors-started") + + val subject = identify(second, "subject") + watch(subject) + subject ! "hello" + expectMsg("hello") + } + + runOn(second) { + system.actorOf(Props[Subject], "subject") + enterBarrier("actors-started") + } + + enterBarrier("watch-established") + // trigger transport FD + TransportFailSpec.fdAvailable.set(false) + + // wait for ungated (also later awaitAssert retry) + Thread.sleep(RARP(system).provider.remoteSettings.RetryGateClosedFor.toMillis) + TransportFailSpec.fdAvailable.set(true) + + runOn(first) { + enterBarrier("actors-started2") + val quarantineProbe = TestProbe() + system.eventStream.subscribe(quarantineProbe.ref, classOf[QuarantinedEvent]) + + var subject2: ActorRef = null + awaitAssert({ + within(1.second) { + subject2 = identify(second, "subject2") + } + }, max = 5.seconds) + watch(subject2) + quarantineProbe.expectNoMsg(1.seconds) + subject2 ! "hello2" + expectMsg("hello2") + enterBarrier("watch-established2") + expectTerminated(subject2) + } + + runOn(second) { + val subject2 = system.actorOf(Props[Subject], "subject2") + enterBarrier("actors-started2") + enterBarrier("watch-established2") + subject2 ! PoisonPill + } + + enterBarrier("done") + + } + + } +} diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index c91af3a436..3a8dfc554c 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -171,7 +171,7 @@ akka { # For TCP it is not important to have fast failure detection, since # most connection failures are captured by TCP itself. # The default DeadlineFailureDetector will trigger if there are no heartbeats within - # the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 20 seconds + # the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 124 seconds # with the default settings. transport-failure-detector { @@ -189,7 +189,7 @@ akka { # A margin to the `heartbeat-interval` is important to be able to survive sudden, # occasional, pauses in heartbeat arrivals, due to for example garbage collect or # network drop. - acceptable-heartbeat-pause = 16 s + acceptable-heartbeat-pause = 120 s } # Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf @@ -381,7 +381,7 @@ akka { # - there are no pending system messages to deliver # for the amount of time configured here, the remote system will be quarantined and all state # associated with it will be dropped. - quarantine-after-silence = 5 d + quarantine-after-silence = 2 d # This setting defines the maximum number of unacknowledged system messages # allowed for a remote system. If this limit is reached the remote system is diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 2f6d3fd181..69799b7dcb 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -215,8 +215,6 @@ private[remote] class ReliableDeliverySupervisor( val autoResendTimer = context.system.scheduler.schedule( settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery) - private var bufferWasInUse = false - override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case e @ (_: AssociationProblem) ⇒ Escalate case NonFatal(e) ⇒ @@ -225,14 +223,12 @@ private[remote] class ReliableDeliverySupervisor( "Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}", remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage, causedBy) uidConfirmed = false // Need confirmation of UID again - if (bufferWasInUse) { - if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty) - bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout) - context.become(gated(writerTerminated = false, earlyUngateRequested = false)) - currentHandle = None - context.parent ! StoppedReading(self) - Stop - } else Escalate + if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty) + bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout) + context.become(gated(writerTerminated = false, earlyUngateRequested = false)) + currentHandle = None + context.parent ! StoppedReading(self) + Stop } var currentHandle: Option[AkkaProtocolHandle] = handleOrActive @@ -242,7 +238,6 @@ private[remote] class ReliableDeliverySupervisor( def reset(): Unit = { resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize) - bufferWasInUse = false seqCounter = 0L bailoutAt = None } @@ -385,7 +380,7 @@ private[remote] class ReliableDeliverySupervisor( } private def goToIdle(): Unit = { - if (bufferWasInUse && maxSilenceTimer.isEmpty) + if (maxSilenceTimer.isEmpty) maxSilenceTimer = Some(context.system.scheduler.scheduleOnce(settings.QuarantineSilentSystemTimeout, self, TooLongIdle)) context.become(idle) } @@ -427,7 +422,6 @@ private[remote] class ReliableDeliverySupervisor( private def tryBuffer(s: Send): Unit = try { resendBuffer = resendBuffer buffer s - bufferWasInUse = true } catch { case NonFatal(e) ⇒ throw new HopelessAssociation(localAddress, remoteAddress, uid, e) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index a36932c089..1c3372b8ab 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -70,7 +70,7 @@ class RemoteConfigSpec extends AkkaSpec( TransportFailureDetectorImplementationClass should ===(classOf[DeadlineFailureDetector].getName) TransportHeartBeatInterval should ===(4.seconds) - TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should ===(16.seconds) + TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should ===(120.seconds) }