Merge pull request #26626 from longshorej/watch-with-mem-leak

Fix memory leak in watchWith (#26625)
This commit is contained in:
Patrik Nordwall 2019-03-29 18:09:22 +01:00 committed by GitHub
commit c06cf62b64
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 16 deletions

View file

@ -19,3 +19,10 @@ ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$Sorte
ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$SortedSetOps$")
ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package$")
ProblemFilters.exclude[MissingClassProblem]("akka.util.ccompat.imm.package")
# Fix memory leak in watchWith - PR #26625
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorCell.terminatedQueuedFor")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.dungeon.DeathWatch.terminatedQueuedFor")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$terminatedQueued")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$terminatedQueued_=")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.terminatedQueuedFor")

View file

@ -256,7 +256,7 @@ private[akka] trait StashSupport {
private def enqueueFirst(envelope: Envelope): Unit = {
mailbox.enqueueFirst(self, envelope)
envelope.message match {
case Terminated(ref) => actorCell.terminatedQueuedFor(ref)
case Terminated(ref) => actorCell.terminatedQueuedFor(ref, None)
case _ =>
}
}

View file

@ -18,7 +18,7 @@ private[akka] trait DeathWatch { this: ActorCell =>
*/
private var watching: Map[ActorRef, Option[Any]] = Map.empty
private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet
private var terminatedQueued: Set[ActorRef] = ActorCell.emptyActorRefSet
private var terminatedQueued: Map[ActorRef, Option[Any]] = Map.empty
def isWatching(ref: ActorRef): Boolean = watching contains ref
@ -56,14 +56,14 @@ private[akka] trait DeathWatch { this: ActorCell =>
watching = removeFromMap(a, watching)
}
}
terminatedQueued = removeFromSet(a, terminatedQueued)
terminatedQueued = removeFromMap(a, terminatedQueued)
a
}
protected def receivedTerminated(t: Terminated): Unit =
if (terminatedQueued(t.actor)) {
terminatedQueued.get(t.actor).foreach { optionalMessage
terminatedQueued -= t.actor // here we know that it is the SAME ref which was put in
receiveMessage(t)
receiveMessage(optionalMessage.getOrElse(t))
}
/**
@ -81,15 +81,16 @@ private[akka] trait DeathWatch { this: ActorCell =>
watching = removeFromMap(actor, watching)
}
if (!isTerminating) {
self.tell(optionalMessage.getOrElse(Terminated(actor)(existenceConfirmed, addressTerminated)), actor)
terminatedQueuedFor(actor)
self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor)
terminatedQueuedFor(actor, optionalMessage)
}
}
if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor)
}
private[akka] def terminatedQueuedFor(subject: ActorRef): Unit =
terminatedQueued += subject
private[akka] def terminatedQueuedFor(subject: ActorRef, customMessage: Option[Any]): Unit =
if (!terminatedQueued.contains(subject))
terminatedQueued += subject -> customMessage
// TODO this should be removed and be replaced with `watching.contains(subject)`
// when all actor references have uid, i.e. actorFor is removed
@ -109,12 +110,6 @@ private[akka] trait DeathWatch { this: ActorCell =>
if (subject.path.uid == ActorCell.undefinedUid) None
else watching.get(new UndefinedUidActorRef(subject)))
// TODO this should be removed and be replaced with `set - subject`
// when all actor references have uid, i.e. actorFor is removed
private def removeFromSet(subject: ActorRef, set: Set[ActorRef]): Set[ActorRef] =
if (subject.path.uid != ActorCell.undefinedUid) (set - subject) - new UndefinedUidActorRef(subject)
else set.filterNot(_.path == subject.path)
// TODO this should be removed and be replaced with `set - subject`
// when all actor references have uid, i.e. actorFor is removed
private def removeFromMap[T](subject: ActorRef, map: Map[ActorRef, T]): Map[ActorRef, T] =
@ -177,7 +172,7 @@ private[akka] trait DeathWatch { this: ActorCell =>
}
} finally {
watching = Map.empty
terminatedQueued = ActorCell.emptyActorRefSet
terminatedQueued = Map.empty
}
}
}