diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/TransportFailSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/TransportFailSpec.scala new file mode 100644 index 0000000000..883fc243c0 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/TransportFailSpec.scala @@ -0,0 +1,166 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.remote + +import java.util.concurrent.atomic.AtomicBoolean + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.PoisonPill +import akka.actor.Props +import akka.event.EventStream +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.testkit._ +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +object TransportFailConfig extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.remote { + transport-failure-detector { + implementation-class = "akka.remote.TransportFailSpec$$TestFailureDetector" + heartbeat-interval = 1 s + } + retry-gate-closed-for = 3 s + # Don't trigger watch Terminated + watch-failure-detector.acceptable-heartbeat-pause = 60 s + #use-passive-connections = off + # This test is not interesting for Artery, no transport failure detector + # but it will not fail when running with Artery enabled + artery.enabled = off + } + """))) + +} + +class TransportFailMultiJvmNode1 extends TransportFailSpec +class TransportFailMultiJvmNode2 extends TransportFailSpec + +object TransportFailSpec { + class Subject extends Actor { + def receive = { + case msg ⇒ sender() ! msg + } + } + + private val fdAvailable = new AtomicBoolean(true) + + // FD that will fail when `fdAvailable` flag is false + class TestFailureDetector(config: Config, ev: EventStream) extends FailureDetector { + @volatile private var active = false + + override def heartbeat(): Unit = { + active = true + } + + override def isAvailable: Boolean = { + if (active) fdAvailable.get + else true + } + + override def isMonitoring: Boolean = active + } +} + +/** + * This was a reproducer for issue #23010. + * - watch from first to second node, i.e. sys msg with seq number 1 + * - trigger transport failure detection to tear down the connection + * - the bug was that on the second node the ReliableDeliverySupervisor + * was stopped because the send buffer had not been used on that side, + * but that removed the receive buffer entry + * - later, after gating elapsed another watch from first to second node, + * i.e. sys msg with seq number 2 + * - when that watch msg was received on the second node the receive buffer + * had been cleared and therefore it thought that seq number 1 was missing, + * and therefore sent nack to the first node + * - when first node received the nack it thrown + * IllegalStateException: Error encountered while processing system message + * acknowledgement buffer: [2 {2}] ack: ACK[2, {1, 0}] + * caused by: ResendUnfulfillableException: Unable to fulfill resend request since + * negatively acknowledged payload is no longer in buffer + * + * This was fixed by not stopping the ReliableDeliverySupervisor so that the + * receive buffer was preserved. + */ +abstract class TransportFailSpec extends RemotingMultiNodeSpec(TransportFailConfig) { + import TransportFailConfig._ + import TransportFailSpec._ + + override def initialParticipants = roles.size + + def identify(role: RoleName, actorName: String): ActorRef = { + system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName) + expectMsgType[ActorIdentity].ref.get + } + + "TransportFail" must { + + "reconnect" taggedAs LongRunningTest in { + runOn(first) { + val secondAddress = node(second).address + enterBarrier("actors-started") + + val subject = identify(second, "subject") + watch(subject) + subject ! "hello" + expectMsg("hello") + } + + runOn(second) { + system.actorOf(Props[Subject], "subject") + enterBarrier("actors-started") + } + + enterBarrier("watch-established") + // trigger transport FD + TransportFailSpec.fdAvailable.set(false) + + // wait for ungated (also later awaitAssert retry) + Thread.sleep(RARP(system).provider.remoteSettings.RetryGateClosedFor.toMillis) + TransportFailSpec.fdAvailable.set(true) + + runOn(first) { + enterBarrier("actors-started2") + val quarantineProbe = TestProbe() + system.eventStream.subscribe(quarantineProbe.ref, classOf[QuarantinedEvent]) + + var subject2: ActorRef = null + awaitAssert({ + within(1.second) { + subject2 = identify(second, "subject2") + } + }, max = 5.seconds) + watch(subject2) + quarantineProbe.expectNoMsg(1.seconds) + subject2 ! "hello2" + expectMsg("hello2") + enterBarrier("watch-established2") + expectTerminated(subject2) + } + + runOn(second) { + val subject2 = system.actorOf(Props[Subject], "subject2") + enterBarrier("actors-started2") + enterBarrier("watch-established2") + subject2 ! PoisonPill + } + + enterBarrier("done") + + } + + } +} diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index c91af3a436..3a8dfc554c 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -171,7 +171,7 @@ akka { # For TCP it is not important to have fast failure detection, since # most connection failures are captured by TCP itself. # The default DeadlineFailureDetector will trigger if there are no heartbeats within - # the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 20 seconds + # the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 124 seconds # with the default settings. transport-failure-detector { @@ -189,7 +189,7 @@ akka { # A margin to the `heartbeat-interval` is important to be able to survive sudden, # occasional, pauses in heartbeat arrivals, due to for example garbage collect or # network drop. - acceptable-heartbeat-pause = 16 s + acceptable-heartbeat-pause = 120 s } # Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf @@ -381,7 +381,7 @@ akka { # - there are no pending system messages to deliver # for the amount of time configured here, the remote system will be quarantined and all state # associated with it will be dropped. - quarantine-after-silence = 5 d + quarantine-after-silence = 2 d # This setting defines the maximum number of unacknowledged system messages # allowed for a remote system. If this limit is reached the remote system is diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 2f6d3fd181..69799b7dcb 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -215,8 +215,6 @@ private[remote] class ReliableDeliverySupervisor( val autoResendTimer = context.system.scheduler.schedule( settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery) - private var bufferWasInUse = false - override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case e @ (_: AssociationProblem) ⇒ Escalate case NonFatal(e) ⇒ @@ -225,14 +223,12 @@ private[remote] class ReliableDeliverySupervisor( "Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}", remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage, causedBy) uidConfirmed = false // Need confirmation of UID again - if (bufferWasInUse) { - if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty) - bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout) - context.become(gated(writerTerminated = false, earlyUngateRequested = false)) - currentHandle = None - context.parent ! StoppedReading(self) - Stop - } else Escalate + if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty) + bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout) + context.become(gated(writerTerminated = false, earlyUngateRequested = false)) + currentHandle = None + context.parent ! StoppedReading(self) + Stop } var currentHandle: Option[AkkaProtocolHandle] = handleOrActive @@ -242,7 +238,6 @@ private[remote] class ReliableDeliverySupervisor( def reset(): Unit = { resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize) - bufferWasInUse = false seqCounter = 0L bailoutAt = None } @@ -385,7 +380,7 @@ private[remote] class ReliableDeliverySupervisor( } private def goToIdle(): Unit = { - if (bufferWasInUse && maxSilenceTimer.isEmpty) + if (maxSilenceTimer.isEmpty) maxSilenceTimer = Some(context.system.scheduler.scheduleOnce(settings.QuarantineSilentSystemTimeout, self, TooLongIdle)) context.become(idle) } @@ -427,7 +422,6 @@ private[remote] class ReliableDeliverySupervisor( private def tryBuffer(s: Send): Unit = try { resendBuffer = resendBuffer buffer s - bufferWasInUse = true } catch { case NonFatal(e) ⇒ throw new HopelessAssociation(localAddress, remoteAddress, uid, e) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index a36932c089..1c3372b8ab 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -70,7 +70,7 @@ class RemoteConfigSpec extends AkkaSpec( TransportFailureDetectorImplementationClass should ===(classOf[DeadlineFailureDetector].getName) TransportHeartBeatInterval should ===(4.seconds) - TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should ===(16.seconds) + TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should ===(120.seconds) }