+rem #18353: Prune reliable deliver actors

(cherry picked from commit 6643f56)
This commit is contained in:
Endre Sándor Varga 2015-09-16 15:26:24 +02:00
parent fc75eb361a
commit c4e326c9dd
6 changed files with 248 additions and 20 deletions

View file

@ -262,6 +262,21 @@ akka {
# the affected systems after lifting the quarantine is undefined.
prune-quarantine-marker-after = 5 d
# If system messages have been exchanged between two systems (i.e. remote death
# watch or remote deployment has been used) a remote system will be marked as
# quarantined after the two system has no active association, and no
# communication happens during the time configured here.
# The only purpose of this setting is to avoid storing system message redelivery
# data (sequence number state, etc.) for an undefined amount of time leading to long
# term memory leak. Instead, if a system has been gone for this period,
# or more exactly
# - there is no association between the two systems (TCP connection, if TCP transport is used)
# - neither side has been attempting to communicate with the other
# - there are no pending system messages to deliver
# for the amount of time configured here, the remote system will be quarantined and all state
# associated with it will be dropped.
quarantine-after-silence = 5 d
# This setting defines the maximum number of unacknowledged system messages
# allowed for a remote system. If this limit is reached the remote system is
# declared to be dead and its UID marked as tainted.

View file

@ -21,7 +21,7 @@ import akka.serialization.Serialization
import akka.util.ByteString
import akka.{ OnlyCauseStackTrace, AkkaException }
import java.io.NotSerializableException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ TimeUnit, TimeoutException, ConcurrentHashMap }
import scala.annotation.tailrec
import scala.concurrent.duration.{ Duration, Deadline }
import scala.util.control.NonFatal
@ -168,6 +168,7 @@ private[remote] object ReliableDeliverySupervisor {
case object IsIdle
case object Idle
case object TooLongIdle
def props(
handleOrActive: Option[AkkaProtocolHandle],
@ -200,6 +201,8 @@ private[remote] class ReliableDeliverySupervisor(
val autoResendTimer = context.system.scheduler.schedule(
settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
private var bufferWasInUse = false
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case e @ (_: AssociationProblem) Escalate
case NonFatal(e)
@ -207,12 +210,14 @@ private[remote] class ReliableDeliverySupervisor(
log.warning("Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}",
remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage, causedBy)
uidConfirmed = false // Need confirmation of UID again
if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty)
bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout)
context.become(gated)
currentHandle = None
context.parent ! StoppedReading(self)
Stop
if (bufferWasInUse) {
if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty)
bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout)
context.become(gated)
currentHandle = None
context.parent ! StoppedReading(self)
Stop
} else Escalate
}
var currentHandle: Option[AkkaProtocolHandle] = handleOrActive
@ -237,6 +242,7 @@ private[remote] class ReliableDeliverySupervisor(
var writer: ActorRef = createWriter()
var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid }
var bailoutAt: Option[Deadline] = None
var maxSilenceTimer: Option[Cancellable] = None
// Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the
// UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for
// any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore
@ -255,6 +261,7 @@ private[remote] class ReliableDeliverySupervisor(
(resendBuffer.nacked ++ resendBuffer.nonAcked) foreach { s context.system.deadLetters ! s.copy(seqOpt = None) }
receiveBuffers.remove(Link(localAddress, remoteAddress))
autoResendTimer.cancel()
maxSilenceTimer.foreach(_.cancel())
}
override def postRestart(reason: Throwable): Unit = {
@ -291,7 +298,7 @@ private[remote] class ReliableDeliverySupervisor(
context.parent ! StoppedReading(self)
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty)
context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
context.become(idle)
goToIdle()
case g @ GotUid(receivedUid, _)
bailoutAt = None
context.parent ! g
@ -321,8 +328,8 @@ private[remote] class ReliableDeliverySupervisor(
new java.util.concurrent.TimeoutException("Delivery of system messages timed out and they were dropped."))
writer = createWriter()
// Resending will be triggered by the incoming GotUid message after the connection finished
context.become(receive)
} else context.become(idle)
goToActive()
} else goToIdle()
case AttemptSysMsgRedelivery // Ignore
case s @ Send(msg: SystemMessage, _, _, _) tryBuffer(s.copy(seqOpt = Some(nextSeq())))
case s: Send context.system.deadLetters ! s
@ -338,18 +345,34 @@ private[remote] class ReliableDeliverySupervisor(
writer = createWriter()
// Resending will be triggered by the incoming GotUid message after the connection finished
handleSend(s)
context.become(receive)
goToActive()
case AttemptSysMsgRedelivery
if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) {
writer = createWriter()
// Resending will be triggered by the incoming GotUid message after the connection finished
context.become(receive)
goToActive()
}
case TooLongIdle
throw new HopelessAssociation(localAddress, remoteAddress, uid,
new TimeoutException("Remote system has been silent for too long. " +
s"(more than ${settings.QuarantineSilentSystemTimeout.toUnit(TimeUnit.HOURS)} hours)"))
case EndpointWriter.FlushAndStop context.stop(self)
case EndpointWriter.StopReading(w, replyTo)
replyTo ! EndpointWriter.StoppedReading(w)
}
private def goToIdle(): Unit = {
if (bufferWasInUse && maxSilenceTimer.isEmpty)
maxSilenceTimer = Some(context.system.scheduler.scheduleOnce(settings.QuarantineSilentSystemTimeout, self, TooLongIdle))
context.become(idle)
}
private def goToActive(): Unit = {
maxSilenceTimer.foreach(_.cancel())
maxSilenceTimer = None
context.become(receive)
}
def flushWait: Receive = {
case IsIdle // Do not reply, we will Terminate soon, which will do the inbound connection unstashing
case Terminated(_)
@ -381,6 +404,7 @@ private[remote] class ReliableDeliverySupervisor(
private def tryBuffer(s: Send): Unit =
try {
resendBuffer = resendBuffer buffer s
bufferWasInUse = true
} catch {
case NonFatal(e) throw new HopelessAssociation(localAddress, remoteAddress, uid, e)
}

View file

@ -92,6 +92,10 @@ final class RemoteSettings(val config: Config) {
config.getMillisDuration("akka.remote.initial-system-message-delivery-timeout")
} requiring (_ > Duration.Zero, "initial-system-message-delivery-timeout must be > 0")
val QuarantineSilentSystemTimeout: FiniteDuration = {
config.getMillisDuration("akka.remote.quarantine-after-silence")
} requiring (_ > Duration.Zero, "quarantine-after-silence must be > 0")
val QuarantineDuration: FiniteDuration = {
config.getMillisDuration("akka.remote.prune-quarantine-marker-after").requiring(_ > Duration.Zero,
"prune-quarantine-marker-after must be > 0 ms")

View file

@ -335,6 +335,8 @@ private[remote] object EndpointManager {
readonlyToAddress -= endpoint
}
def addressForWriter(writer: ActorRef): Option[Address] = writableToAddress.get(writer)
def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address)
def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match {
@ -415,11 +417,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
// Mapping between transports and the local addresses they listen to
var transportMapping: Map[Address, AkkaProtocolTransport] = Map()
def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero
val pruneInterval: FiniteDuration = if (retryGateEnabled) settings.RetryGateClosedFor * 2 else Duration.Zero
val pruneTimerCancellable: Option[Cancellable] = if (retryGateEnabled)
Some(context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune))
else None
val pruneInterval: FiniteDuration = (settings.RetryGateClosedFor * 2).max(1.second).min(10.seconds)
val pruneTimerCancellable: Cancellable =
context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]()
var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]()
@ -481,11 +482,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
Stop
case NonFatal(e)
// logging
e match {
case _: EndpointDisassociatedException | _: EndpointAssociationException // no logging
case _ log.error(e, e.getMessage)
}
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
Stop
}
@ -833,7 +834,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
private var normalShutdown = false
override def postStop(): Unit = {
pruneTimerCancellable.foreach { _.cancel() }
pruneTimerCancellable.cancel()
pendingReadHandoffs.valuesIterator foreach (_.disassociate(AssociationHandle.Shutdown))
if (!normalShutdown) {

View file

@ -0,0 +1,178 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import java.util.concurrent.TimeoutException
import akka.actor._
import akka.actor.dungeon.ChildrenContainer
import akka.event.Logging
import akka.remote.transport.ThrottlerTransportAdapter.Direction.Both
import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociate, Blackhole, SetThrottle }
import akka.testkit._
import akka.testkit.TestActors.EchoActor
import com.typesafe.config.ConfigFactory
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
object ActorsLeakSpec {
val config = ConfigFactory.parseString(
"""
| akka.actor.provider = "akka.remote.RemoteActorRefProvider"
| #akka.loglevel = DEBUG
| akka.remote.netty.tcp.applied-adapters = ["trttl"]
| #akka.remote.log-lifecycle-events = on
| akka.remote.transport-failure-detector.heartbeat-interval = 1 s
| akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s
| akka.remote.quarantine-after-silence = 3 s
| akka.test.filter-leeway = 10 s
|
|""".stripMargin)
def collectLiveActors(root: ActorRef): immutable.Seq[ActorRef] = {
def recurse(node: ActorRef): List[ActorRef] = {
val children: List[ActorRef] = node match {
case wc: ActorRefWithCell
val cell = wc.underlying
cell.childrenRefs match {
case ChildrenContainer.TerminatingChildrenContainer(_, toDie, reason) Nil
case x @ (ChildrenContainer.TerminatedChildrenContainer | ChildrenContainer.EmptyChildrenContainer) Nil
case n: ChildrenContainer.NormalChildrenContainer cell.childrenRefs.children.toList
case x Nil
}
case _ Nil
}
node :: children.flatMap(recurse)
}
recurse(root)
}
class StoppableActor extends Actor {
override def receive = {
case "stop" context.stop(self)
}
}
}
class ActorsLeakSpec extends AkkaSpec(ActorsLeakSpec.config) with ImplicitSender {
import ActorsLeakSpec._
"Remoting" must {
"not leak actors" in {
val ref = system.actorOf(Props[EchoActor], "echo")
val echoPath = RootActorPath(RARP(system).provider.getDefaultAddress) / "user" / "echo"
val targets = List("/system/endpointManager", "/system/transports").map { path
system.actorSelection(path) ! Identify(0)
expectMsgType[ActorIdentity].getRef
}
val initialActors = targets.flatMap(collectLiveActors).toSet
//Clean shutdown case
for (_ 1 to 3) {
val remoteSystem = ActorSystem(
"remote",
ConfigFactory.parseString("akka.remote.netty.tcp.port = 0")
.withFallback(config))
try {
val probe = TestProbe()(remoteSystem)
remoteSystem.actorSelection(echoPath).tell(Identify(1), probe.ref)
probe.expectMsgType[ActorIdentity].ref.nonEmpty should be(true)
} finally {
remoteSystem.terminate()
}
Await.ready(remoteSystem.whenTerminated, 10.seconds)
}
// Missing SHUTDOWN case
for (_ 1 to 3) {
val remoteSystem = ActorSystem(
"remote",
ConfigFactory.parseString("akka.remote.netty.tcp.port = 0")
.withFallback(config))
val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress
try {
val probe = TestProbe()(remoteSystem)
remoteSystem.actorSelection(echoPath).tell(Identify(1), probe.ref)
probe.expectMsgType[ActorIdentity].ref.nonEmpty should be(true)
// This will make sure that no SHUTDOWN message gets through
Await.ready(
RARP(system).provider.transport.managementCommand(ForceDisassociate(remoteAddress)),
3.seconds)
} finally {
remoteSystem.terminate()
}
EventFilter.warning(pattern = "Association with remote system", occurrences = 1).intercept {
Await.ready(remoteSystem.whenTerminated, 10.seconds)
}
}
// Remote idle for too long case
val remoteSystem = ActorSystem(
"remote",
ConfigFactory.parseString("akka.remote.netty.tcp.port = 0")
.withFallback(config))
val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress
remoteSystem.actorOf(Props[StoppableActor], "stoppable")
try {
val probe = TestProbe()(remoteSystem)
remoteSystem.actorSelection(echoPath).tell(Identify(1), probe.ref)
probe.expectMsgType[ActorIdentity].ref.nonEmpty should be(true)
// Watch a remote actor - this results in system message traffic
system.actorSelection(RootActorPath(remoteAddress) / "user" / "stoppable") ! Identify(1)
val remoteActor = expectMsgType[ActorIdentity].ref.get
watch(remoteActor)
remoteActor ! "stop"
expectTerminated(remoteActor)
// All system messages has been acked now on this side
// This will make sure that no SHUTDOWN message gets through
Await.ready(
RARP(system).provider.transport.managementCommand(ForceDisassociate(remoteAddress)),
3.seconds)
} finally {
remoteSystem.terminate()
}
EventFilter.warning(pattern = "Association with remote system", occurrences = 1).intercept {
Await.ready(remoteSystem.whenTerminated, 10.seconds)
}
EventFilter[TimeoutException](occurrences = 1).intercept {}
val finalActors = targets.flatMap(collectLiveActors).toSet
(finalActors diff initialActors) should be(Set.empty)
}
}
}

View file

@ -569,7 +569,13 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[MissingMethodProblem]("akka.japi.Pair.toString") // reported on PR validation machine which uses Java 1.8.0_45
),
"2.3.14" -> bcIssuesBetween23and24,
"2.4.0" -> Seq(FilterAnyProblem("akka.remote.transport.ProtocolStateActor"))
"2.4.0" -> Seq(
FilterAnyProblem("akka.remote.transport.ProtocolStateActor"),
//#18353 Changes to methods and fields private to remoting actors
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.EndpointManager.retryGateEnabled"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.EndpointManager.pruneTimerCancellable")
)
)
}
}