free memory when FunctionRef is stopped, #28959 (#29026)

This commit is contained in:
Patrik Nordwall 2020-05-07 15:20:56 +02:00 committed by GitHub
parent 2223413503
commit 066f461916
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -809,6 +809,15 @@ private[akka] class VirtualPathContainer(
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object FunctionRef {
def deadLetterMessageHandler(system: ActorSystem): (ActorRef, Any) => Unit = { (sender, msg) =>
system.deadLetters.tell(msg, sender)
}
}
/**
* INTERNAL API
*
@ -826,17 +835,20 @@ private[akka] class VirtualPathContainer(
* [[FunctionRef#unwatch]] must be called to avoid a resource leak, which is different
* from an ordinary actor.
*/
private[akka] final class FunctionRef(
@InternalApi private[akka] final class FunctionRef(
override val path: ActorPath,
override val provider: ActorRefProvider,
system: ActorSystem,
f: (ActorRef, Any) => Unit)
extends MinimalActorRef {
// var because it's replaced in `stop`
private var messageHandler: (ActorRef, Any) => Unit = f
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
message match {
case AddressTerminated(address) => addressTerminated(address)
case _ => f(sender, message)
case _ => messageHandler(sender, message)
}
}
@ -922,7 +934,13 @@ private[akka] final class FunctionRef(
}
}
override def stop(): Unit = sendTerminated()
override def stop(): Unit = {
sendTerminated()
// The messageHandler function may close over a large object graph (such as an Akka Stream)
// so we replace the messageHandler function to make that available for garbage collection.
// Doesn't matter if the change isn't visible immediately, volatile not needed.
messageHandler = FunctionRef.deadLetterMessageHandler(system)
}
private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val selfTerminated = this.synchronized {