Fix memory leak in watchWith (#26625)

Instead of delivering the custom message, store it locally and then
deliver it when the Terminated instance is received.

This ensures that terminatedQueued is properly cleaned when watchWith is
used.
This commit is contained in:
Jason Longshore 2019-03-27 18:16:45 -05:00
parent ef896f533e
commit c3cedbde27
3 changed files with 18 additions and 16 deletions

View file

@ -19,3 +19,10 @@ ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$Sorte
ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$SortedSetOps$") ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$SortedSetOps$")
ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$") ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$")
ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package") ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package")
# Fix memory leak in watchWith - PR #26625
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorCell.terminatedQueuedFor")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.dungeon.DeathWatch.terminatedQueuedFor")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$terminatedQueued")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$terminatedQueued_=")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.terminatedQueuedFor")

View file

@ -256,7 +256,7 @@ private[akka] trait StashSupport {
private def enqueueFirst(envelope: Envelope): Unit = { private def enqueueFirst(envelope: Envelope): Unit = {
mailbox.enqueueFirst(self, envelope) mailbox.enqueueFirst(self, envelope)
envelope.message match { envelope.message match {
case Terminated(ref) => actorCell.terminatedQueuedFor(ref) case Terminated(ref) => actorCell.terminatedQueuedFor(ref, None)
case _ => case _ =>
} }
} }

View file

@ -18,7 +18,7 @@ private[akka] trait DeathWatch { this: ActorCell =>
*/ */
private var watching: Map[ActorRef, Option[Any]] = Map.empty private var watching: Map[ActorRef, Option[Any]] = Map.empty
private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet
private var terminatedQueued: Set[ActorRef] = ActorCell.emptyActorRefSet private var terminatedQueued: Map[ActorRef, Option[Any]] = Map.empty
def isWatching(ref: ActorRef): Boolean = watching contains ref def isWatching(ref: ActorRef): Boolean = watching contains ref
@ -56,14 +56,14 @@ private[akka] trait DeathWatch { this: ActorCell =>
watching = removeFromMap(a, watching) watching = removeFromMap(a, watching)
} }
} }
terminatedQueued = removeFromSet(a, terminatedQueued) terminatedQueued = removeFromMap(a, terminatedQueued)
a a
} }
protected def receivedTerminated(t: Terminated): Unit = protected def receivedTerminated(t: Terminated): Unit =
if (terminatedQueued(t.actor)) { terminatedQueued.get(t.actor).foreach { optionalMessage
terminatedQueued -= t.actor // here we know that it is the SAME ref which was put in terminatedQueued -= t.actor // here we know that it is the SAME ref which was put in
receiveMessage(t) receiveMessage(optionalMessage.getOrElse(t))
} }
/** /**
@ -81,15 +81,16 @@ private[akka] trait DeathWatch { this: ActorCell =>
watching = removeFromMap(actor, watching) watching = removeFromMap(actor, watching)
} }
if (!isTerminating) { if (!isTerminating) {
self.tell(optionalMessage.getOrElse(Terminated(actor)(existenceConfirmed, addressTerminated)), actor) self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor)
terminatedQueuedFor(actor) terminatedQueuedFor(actor, optionalMessage)
} }
} }
if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor) if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor)
} }
private[akka] def terminatedQueuedFor(subject: ActorRef): Unit = private[akka] def terminatedQueuedFor(subject: ActorRef, customMessage: Option[Any]): Unit =
terminatedQueued += subject if (!terminatedQueued.contains(subject))
terminatedQueued += subject -> customMessage
// TODO this should be removed and be replaced with `watching.contains(subject)` // TODO this should be removed and be replaced with `watching.contains(subject)`
// when all actor references have uid, i.e. actorFor is removed // when all actor references have uid, i.e. actorFor is removed
@ -109,12 +110,6 @@ private[akka] trait DeathWatch { this: ActorCell =>
if (subject.path.uid == ActorCell.undefinedUid) None if (subject.path.uid == ActorCell.undefinedUid) None
else watching.get(new UndefinedUidActorRef(subject))) else watching.get(new UndefinedUidActorRef(subject)))
// TODO this should be removed and be replaced with `set - subject`
// when all actor references have uid, i.e. actorFor is removed
private def removeFromSet(subject: ActorRef, set: Set[ActorRef]): Set[ActorRef] =
if (subject.path.uid != ActorCell.undefinedUid) (set - subject) - new UndefinedUidActorRef(subject)
else set.filterNot(_.path == subject.path)
// TODO this should be removed and be replaced with `set - subject` // TODO this should be removed and be replaced with `set - subject`
// when all actor references have uid, i.e. actorFor is removed // when all actor references have uid, i.e. actorFor is removed
private def removeFromMap[T](subject: ActorRef, map: Map[ActorRef, T]): Map[ActorRef, T] = private def removeFromMap[T](subject: ActorRef, map: Map[ActorRef, T]): Map[ActorRef, T] =
@ -177,7 +172,7 @@ private[akka] trait DeathWatch { this: ActorCell =>
} }
} finally { } finally {
watching = Map.empty watching = Map.empty
terminatedQueued = ActorCell.emptyActorRefSet terminatedQueued = Map.empty
} }
} }
} }