From c3cedbde27550467c668f02e92fdec0521c60a23 Mon Sep 17 00:00:00 2001 From: Jason Longshore Date: Wed, 27 Mar 2019 18:16:45 -0500 Subject: [PATCH] Fix memory leak in watchWith (#26625) Instead of delivering the custom message, store it locally and then deliver it when the Terminated instance is received. This ensures that terminatedQueued is properly cleaned when watchWith is used. --- .../mima-filters/2.5.21.backwards.excludes | 7 ++++++ .../src/main/scala/akka/actor/Stash.scala | 2 +- .../scala/akka/actor/dungeon/DeathWatch.scala | 25 ++++++++----------- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/akka-actor/src/main/mima-filters/2.5.21.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.21.backwards.excludes index e2d7c6a952..e96c7ddd15 100644 --- a/akka-actor/src/main/mima-filters/2.5.21.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.21.backwards.excludes @@ -19,3 +19,10 @@ ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$Sorte ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$SortedSetOps$") ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$") ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package") + +# Fix memory leak in watchWith - PR #26625 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorCell.terminatedQueuedFor") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.dungeon.DeathWatch.terminatedQueuedFor") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$terminatedQueued") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$terminatedQueued_=") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.terminatedQueuedFor") \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index ee49e43845..cb372e74d0 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -256,7 +256,7 @@ private[akka] trait StashSupport { private def enqueueFirst(envelope: Envelope): Unit = { mailbox.enqueueFirst(self, envelope) envelope.message match { - case Terminated(ref) => actorCell.terminatedQueuedFor(ref) + case Terminated(ref) => actorCell.terminatedQueuedFor(ref, None) case _ => } } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index f2fdc6f464..55eb44113a 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -18,7 +18,7 @@ private[akka] trait DeathWatch { this: ActorCell => */ private var watching: Map[ActorRef, Option[Any]] = Map.empty private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet - private var terminatedQueued: Set[ActorRef] = ActorCell.emptyActorRefSet + private var terminatedQueued: Map[ActorRef, Option[Any]] = Map.empty def isWatching(ref: ActorRef): Boolean = watching contains ref @@ -56,14 +56,14 @@ private[akka] trait DeathWatch { this: ActorCell => watching = removeFromMap(a, watching) } } - terminatedQueued = removeFromSet(a, terminatedQueued) + terminatedQueued = removeFromMap(a, terminatedQueued) a } protected def receivedTerminated(t: Terminated): Unit = - if (terminatedQueued(t.actor)) { + terminatedQueued.get(t.actor).foreach { optionalMessage ⇒ terminatedQueued -= t.actor // here we know that it is the SAME ref which was put in - receiveMessage(t) + receiveMessage(optionalMessage.getOrElse(t)) } /** @@ -81,15 +81,16 @@ private[akka] trait DeathWatch { this: ActorCell => watching = removeFromMap(actor, watching) } if (!isTerminating) { - self.tell(optionalMessage.getOrElse(Terminated(actor)(existenceConfirmed, addressTerminated)), actor) - terminatedQueuedFor(actor) + self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor) + terminatedQueuedFor(actor, optionalMessage) } } if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor) } - private[akka] def terminatedQueuedFor(subject: ActorRef): Unit = - terminatedQueued += subject + private[akka] def terminatedQueuedFor(subject: ActorRef, customMessage: Option[Any]): Unit = + if (!terminatedQueued.contains(subject)) + terminatedQueued += subject -> customMessage // TODO this should be removed and be replaced with `watching.contains(subject)` // when all actor references have uid, i.e. actorFor is removed @@ -109,12 +110,6 @@ private[akka] trait DeathWatch { this: ActorCell => if (subject.path.uid == ActorCell.undefinedUid) None else watching.get(new UndefinedUidActorRef(subject))) - // TODO this should be removed and be replaced with `set - subject` - // when all actor references have uid, i.e. actorFor is removed - private def removeFromSet(subject: ActorRef, set: Set[ActorRef]): Set[ActorRef] = - if (subject.path.uid != ActorCell.undefinedUid) (set - subject) - new UndefinedUidActorRef(subject) - else set.filterNot(_.path == subject.path) - // TODO this should be removed and be replaced with `set - subject` // when all actor references have uid, i.e. actorFor is removed private def removeFromMap[T](subject: ActorRef, map: Map[ActorRef, T]): Map[ActorRef, T] = @@ -177,7 +172,7 @@ private[akka] trait DeathWatch { this: ActorCell => } } finally { watching = Map.empty - terminatedQueued = ActorCell.emptyActorRefSet + terminatedQueued = Map.empty } } }