From c4e326c9dd60e69c35b6f658fe0786456255bb68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 16 Sep 2015 15:26:24 +0200 Subject: [PATCH] +rem #18353: Prune reliable deliver actors (cherry picked from commit 6643f56) --- akka-remote/src/main/resources/reference.conf | 15 ++ .../src/main/scala/akka/remote/Endpoint.scala | 48 +++-- .../scala/akka/remote/RemoteSettings.scala | 4 + .../src/main/scala/akka/remote/Remoting.scala | 15 +- .../scala/akka/remote/ActorsLeakSpec.scala | 178 ++++++++++++++++++ project/MiMa.scala | 8 +- 6 files changed, 248 insertions(+), 20 deletions(-) create mode 100644 akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index e3d9bb5f9a..ab8170b9b3 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -262,6 +262,21 @@ akka { # the affected systems after lifting the quarantine is undefined. prune-quarantine-marker-after = 5 d + # If system messages have been exchanged between two systems (i.e. remote death + # watch or remote deployment has been used) a remote system will be marked as + # quarantined after the two system has no active association, and no + # communication happens during the time configured here. + # The only purpose of this setting is to avoid storing system message redelivery + # data (sequence number state, etc.) for an undefined amount of time leading to long + # term memory leak. Instead, if a system has been gone for this period, + # or more exactly + # - there is no association between the two systems (TCP connection, if TCP transport is used) + # - neither side has been attempting to communicate with the other + # - there are no pending system messages to deliver + # for the amount of time configured here, the remote system will be quarantined and all state + # associated with it will be dropped. + quarantine-after-silence = 5 d + # This setting defines the maximum number of unacknowledged system messages # allowed for a remote system. If this limit is reached the remote system is # declared to be dead and its UID marked as tainted. diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index cec59dc660..8743d364d4 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -21,7 +21,7 @@ import akka.serialization.Serialization import akka.util.ByteString import akka.{ OnlyCauseStackTrace, AkkaException } import java.io.NotSerializableException -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ TimeUnit, TimeoutException, ConcurrentHashMap } import scala.annotation.tailrec import scala.concurrent.duration.{ Duration, Deadline } import scala.util.control.NonFatal @@ -168,6 +168,7 @@ private[remote] object ReliableDeliverySupervisor { case object IsIdle case object Idle + case object TooLongIdle def props( handleOrActive: Option[AkkaProtocolHandle], @@ -200,6 +201,8 @@ private[remote] class ReliableDeliverySupervisor( val autoResendTimer = context.system.scheduler.schedule( settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery) + private var bufferWasInUse = false + override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case e @ (_: AssociationProblem) ⇒ Escalate case NonFatal(e) ⇒ @@ -207,12 +210,14 @@ private[remote] class ReliableDeliverySupervisor( log.warning("Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}", remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage, causedBy) uidConfirmed = false // Need confirmation of UID again - if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty) - bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout) - context.become(gated) - currentHandle = None - context.parent ! StoppedReading(self) - Stop + if (bufferWasInUse) { + if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty) + bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout) + context.become(gated) + currentHandle = None + context.parent ! StoppedReading(self) + Stop + } else Escalate } var currentHandle: Option[AkkaProtocolHandle] = handleOrActive @@ -237,6 +242,7 @@ private[remote] class ReliableDeliverySupervisor( var writer: ActorRef = createWriter() var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid } var bailoutAt: Option[Deadline] = None + var maxSilenceTimer: Option[Cancellable] = None // Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the // UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for // any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore @@ -255,6 +261,7 @@ private[remote] class ReliableDeliverySupervisor( (resendBuffer.nacked ++ resendBuffer.nonAcked) foreach { s ⇒ context.system.deadLetters ! s.copy(seqOpt = None) } receiveBuffers.remove(Link(localAddress, remoteAddress)) autoResendTimer.cancel() + maxSilenceTimer.foreach(_.cancel()) } override def postRestart(reason: Throwable): Unit = { @@ -291,7 +298,7 @@ private[remote] class ReliableDeliverySupervisor( context.parent ! StoppedReading(self) if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery) - context.become(idle) + goToIdle() case g @ GotUid(receivedUid, _) ⇒ bailoutAt = None context.parent ! g @@ -321,8 +328,8 @@ private[remote] class ReliableDeliverySupervisor( new java.util.concurrent.TimeoutException("Delivery of system messages timed out and they were dropped.")) writer = createWriter() // Resending will be triggered by the incoming GotUid message after the connection finished - context.become(receive) - } else context.become(idle) + goToActive() + } else goToIdle() case AttemptSysMsgRedelivery ⇒ // Ignore case s @ Send(msg: SystemMessage, _, _, _) ⇒ tryBuffer(s.copy(seqOpt = Some(nextSeq()))) case s: Send ⇒ context.system.deadLetters ! s @@ -338,18 +345,34 @@ private[remote] class ReliableDeliverySupervisor( writer = createWriter() // Resending will be triggered by the incoming GotUid message after the connection finished handleSend(s) - context.become(receive) + goToActive() case AttemptSysMsgRedelivery ⇒ if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) { writer = createWriter() // Resending will be triggered by the incoming GotUid message after the connection finished - context.become(receive) + goToActive() } + case TooLongIdle ⇒ + throw new HopelessAssociation(localAddress, remoteAddress, uid, + new TimeoutException("Remote system has been silent for too long. " + + s"(more than ${settings.QuarantineSilentSystemTimeout.toUnit(TimeUnit.HOURS)} hours)")) case EndpointWriter.FlushAndStop ⇒ context.stop(self) case EndpointWriter.StopReading(w, replyTo) ⇒ replyTo ! EndpointWriter.StoppedReading(w) } + private def goToIdle(): Unit = { + if (bufferWasInUse && maxSilenceTimer.isEmpty) + maxSilenceTimer = Some(context.system.scheduler.scheduleOnce(settings.QuarantineSilentSystemTimeout, self, TooLongIdle)) + context.become(idle) + } + + private def goToActive(): Unit = { + maxSilenceTimer.foreach(_.cancel()) + maxSilenceTimer = None + context.become(receive) + } + def flushWait: Receive = { case IsIdle ⇒ // Do not reply, we will Terminate soon, which will do the inbound connection unstashing case Terminated(_) ⇒ @@ -381,6 +404,7 @@ private[remote] class ReliableDeliverySupervisor( private def tryBuffer(s: Send): Unit = try { resendBuffer = resendBuffer buffer s + bufferWasInUse = true } catch { case NonFatal(e) ⇒ throw new HopelessAssociation(localAddress, remoteAddress, uid, e) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 4d655e2abe..a020a800cc 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -92,6 +92,10 @@ final class RemoteSettings(val config: Config) { config.getMillisDuration("akka.remote.initial-system-message-delivery-timeout") } requiring (_ > Duration.Zero, "initial-system-message-delivery-timeout must be > 0") + val QuarantineSilentSystemTimeout: FiniteDuration = { + config.getMillisDuration("akka.remote.quarantine-after-silence") + } requiring (_ > Duration.Zero, "quarantine-after-silence must be > 0") + val QuarantineDuration: FiniteDuration = { config.getMillisDuration("akka.remote.prune-quarantine-marker-after").requiring(_ > Duration.Zero, "prune-quarantine-marker-after must be > 0 ms") diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 858dcd40b8..445df3ee84 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -335,6 +335,8 @@ private[remote] object EndpointManager { readonlyToAddress -= endpoint } + def addressForWriter(writer: ActorRef): Option[Address] = writableToAddress.get(writer) + def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address) def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match { @@ -415,11 +417,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends // Mapping between transports and the local addresses they listen to var transportMapping: Map[Address, AkkaProtocolTransport] = Map() - def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero - val pruneInterval: FiniteDuration = if (retryGateEnabled) settings.RetryGateClosedFor * 2 else Duration.Zero - val pruneTimerCancellable: Option[Cancellable] = if (retryGateEnabled) - Some(context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)) - else None + val pruneInterval: FiniteDuration = (settings.RetryGateClosedFor * 2).max(1.second).min(10.seconds) + + val pruneTimerCancellable: Cancellable = + context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune) var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]() var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]() @@ -481,11 +482,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Stop case NonFatal(e) ⇒ - // logging e match { case _: EndpointDisassociatedException | _: EndpointAssociationException ⇒ // no logging case _ ⇒ log.error(e, e.getMessage) } + endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) Stop } @@ -833,7 +834,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends private var normalShutdown = false override def postStop(): Unit = { - pruneTimerCancellable.foreach { _.cancel() } + pruneTimerCancellable.cancel() pendingReadHandoffs.valuesIterator foreach (_.disassociate(AssociationHandle.Shutdown)) if (!normalShutdown) { diff --git a/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala b/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala new file mode 100644 index 0000000000..dfb798677a --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala @@ -0,0 +1,178 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.remote + +import java.util.concurrent.TimeoutException + +import akka.actor._ +import akka.actor.dungeon.ChildrenContainer +import akka.event.Logging +import akka.remote.transport.ThrottlerTransportAdapter.Direction.Both +import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociate, Blackhole, SetThrottle } +import akka.testkit._ +import akka.testkit.TestActors.EchoActor +import com.typesafe.config.ConfigFactory + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ + +object ActorsLeakSpec { + + val config = ConfigFactory.parseString( + """ + | akka.actor.provider = "akka.remote.RemoteActorRefProvider" + | #akka.loglevel = DEBUG + | akka.remote.netty.tcp.applied-adapters = ["trttl"] + | #akka.remote.log-lifecycle-events = on + | akka.remote.transport-failure-detector.heartbeat-interval = 1 s + | akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s + | akka.remote.quarantine-after-silence = 3 s + | akka.test.filter-leeway = 10 s + | + |""".stripMargin) + + def collectLiveActors(root: ActorRef): immutable.Seq[ActorRef] = { + + def recurse(node: ActorRef): List[ActorRef] = { + val children: List[ActorRef] = node match { + case wc: ActorRefWithCell ⇒ + val cell = wc.underlying + + cell.childrenRefs match { + case ChildrenContainer.TerminatingChildrenContainer(_, toDie, reason) ⇒ Nil + case x @ (ChildrenContainer.TerminatedChildrenContainer | ChildrenContainer.EmptyChildrenContainer) ⇒ Nil + case n: ChildrenContainer.NormalChildrenContainer ⇒ cell.childrenRefs.children.toList + case x ⇒ Nil + } + case _ ⇒ Nil + } + + node :: children.flatMap(recurse) + } + + recurse(root) + } + + class StoppableActor extends Actor { + override def receive = { + case "stop" ⇒ context.stop(self) + } + } + +} + +class ActorsLeakSpec extends AkkaSpec(ActorsLeakSpec.config) with ImplicitSender { + import ActorsLeakSpec._ + + "Remoting" must { + + "not leak actors" in { + val ref = system.actorOf(Props[EchoActor], "echo") + val echoPath = RootActorPath(RARP(system).provider.getDefaultAddress) / "user" / "echo" + + val targets = List("/system/endpointManager", "/system/transports").map { path ⇒ + system.actorSelection(path) ! Identify(0) + expectMsgType[ActorIdentity].getRef + } + + val initialActors = targets.flatMap(collectLiveActors).toSet + + //Clean shutdown case + for (_ ← 1 to 3) { + + val remoteSystem = ActorSystem( + "remote", + ConfigFactory.parseString("akka.remote.netty.tcp.port = 0") + .withFallback(config)) + + try { + val probe = TestProbe()(remoteSystem) + + remoteSystem.actorSelection(echoPath).tell(Identify(1), probe.ref) + probe.expectMsgType[ActorIdentity].ref.nonEmpty should be(true) + + } finally { + remoteSystem.terminate() + } + + Await.ready(remoteSystem.whenTerminated, 10.seconds) + } + + // Missing SHUTDOWN case + for (_ ← 1 to 3) { + + val remoteSystem = ActorSystem( + "remote", + ConfigFactory.parseString("akka.remote.netty.tcp.port = 0") + .withFallback(config)) + val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress + + try { + val probe = TestProbe()(remoteSystem) + + remoteSystem.actorSelection(echoPath).tell(Identify(1), probe.ref) + probe.expectMsgType[ActorIdentity].ref.nonEmpty should be(true) + + // This will make sure that no SHUTDOWN message gets through + Await.ready( + RARP(system).provider.transport.managementCommand(ForceDisassociate(remoteAddress)), + 3.seconds) + + } finally { + remoteSystem.terminate() + } + + EventFilter.warning(pattern = "Association with remote system", occurrences = 1).intercept { + Await.ready(remoteSystem.whenTerminated, 10.seconds) + } + } + + // Remote idle for too long case + val remoteSystem = ActorSystem( + "remote", + ConfigFactory.parseString("akka.remote.netty.tcp.port = 0") + .withFallback(config)) + val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress + + remoteSystem.actorOf(Props[StoppableActor], "stoppable") + + try { + val probe = TestProbe()(remoteSystem) + + remoteSystem.actorSelection(echoPath).tell(Identify(1), probe.ref) + probe.expectMsgType[ActorIdentity].ref.nonEmpty should be(true) + + // Watch a remote actor - this results in system message traffic + system.actorSelection(RootActorPath(remoteAddress) / "user" / "stoppable") ! Identify(1) + val remoteActor = expectMsgType[ActorIdentity].ref.get + watch(remoteActor) + remoteActor ! "stop" + expectTerminated(remoteActor) + // All system messages has been acked now on this side + + // This will make sure that no SHUTDOWN message gets through + Await.ready( + RARP(system).provider.transport.managementCommand(ForceDisassociate(remoteAddress)), + 3.seconds) + + } finally { + remoteSystem.terminate() + } + + EventFilter.warning(pattern = "Association with remote system", occurrences = 1).intercept { + Await.ready(remoteSystem.whenTerminated, 10.seconds) + } + + EventFilter[TimeoutException](occurrences = 1).intercept {} + + val finalActors = targets.flatMap(collectLiveActors).toSet + + (finalActors diff initialActors) should be(Set.empty) + + } + + } + +} diff --git a/project/MiMa.scala b/project/MiMa.scala index 1103f081a9..f874b17ec8 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -569,7 +569,13 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingMethodProblem]("akka.japi.Pair.toString") // reported on PR validation machine which uses Java 1.8.0_45 ), "2.3.14" -> bcIssuesBetween23and24, - "2.4.0" -> Seq(FilterAnyProblem("akka.remote.transport.ProtocolStateActor")) + "2.4.0" -> Seq( + FilterAnyProblem("akka.remote.transport.ProtocolStateActor"), + + //#18353 Changes to methods and fields private to remoting actors + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.EndpointManager.retryGateEnabled"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.EndpointManager.pruneTimerCancellable") + ) ) } }