SubSinkInlet/SubSourceOutlet memory leak fix (#29967)

* Memory leak in auto-closing of sub inlets/outlets fixed #29966
* Avoid separate nested instances of the AsyncCallback state classes for each async callback.
This commit is contained in:
Johan Andrén 2021-01-25 15:35:54 +01:00 committed by GitHub
parent 9e97b995f0
commit 0bf29fd232
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 13 deletions

View file

@ -0,0 +1,7 @@
# small optimization of ConcurrentAsyncCallback internal state
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$Event$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$State")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$Pending$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$Event")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$Initialized$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$Pending")

View file

@ -272,6 +272,22 @@ object GraphStageLogic {
private[stream] val NoPromise: Promise[Done] = Promise.successful(Done) private[stream] val NoPromise: Promise[Done] = Promise.successful(Done)
} }
/**
* INTERNAL API
*/
@InternalApi
private[akka] object ConcurrentAsyncCallbackState {
sealed trait State[+E]
// waiting for materialization completion or during dispatching of initially queued events
final case class Pending[E](pendingEvents: List[Event[E]]) extends State[E]
// stream is initialized and so no threads can just send events without any synchronization overhead
case object Initialized extends State[Nothing]
// Event with feedback promise
final case class Event[E](e: E, handlingPromise: Promise[Done])
val NoPendingEvents = Pending[Nothing](Nil)
}
/** /**
* Represents the processing logic behind a [[GraphStage]]. Roughly speaking, a subclass of [[GraphStageLogic]] is a * Represents the processing logic behind a [[GraphStage]]. Roughly speaking, a subclass of [[GraphStageLogic]] is a
* collection of the following parts: * collection of the following parts:
@ -1174,18 +1190,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* "Real world" calls of [[invokeWithFeedback]] always return failed promises for `Completed` state * "Real world" calls of [[invokeWithFeedback]] always return failed promises for `Completed` state
*/ */
private final class ConcurrentAsyncCallback[T](handler: T => Unit) extends AsyncCallback[T] { private final class ConcurrentAsyncCallback[T](handler: T => Unit) extends AsyncCallback[T] {
import ConcurrentAsyncCallbackState._
sealed trait State private[this] val currentState = new AtomicReference[State[T]](NoPendingEvents)
// waiting for materialization completion or during dispatching of initially queued events
// - can't be final because of SI-4440
private case class Pending(pendingEvents: List[Event]) extends State
// stream is initialized and so no threads can just send events without any synchronization overhead
private case object Initialized extends State
// Event with feedback promise - can't be final because of SI-4440
private case class Event(e: T, handlingPromise: Promise[Done])
private[this] val NoPendingEvents = Pending(Nil)
private[this] val currentState = new AtomicReference[State](NoPendingEvents)
// is called from the owning [[GraphStage]] // is called from the owning [[GraphStage]]
@tailrec @tailrec
@ -1240,7 +1246,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
case list @ Pending(l) => case list @ Pending(l) =>
// not started yet // not started yet
if (!currentState.compareAndSet(list, Pending(Event(event, promise) :: l))) if (!currentState.compareAndSet(list, Pending[T](Event[T](event, promise) :: l)))
invokeWithPromise(event, promise) invokeWithPromise(event, promise)
} }
@ -1435,9 +1441,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
case OnComplete => case OnComplete =>
closed = true closed = true
handler.onUpstreamFinish() handler.onUpstreamFinish()
GraphStageLogic.this.completedOrFailed(this)
case OnError(ex) => case OnError(ex) =>
closed = true closed = true
handler.onUpstreamFailure(ex) handler.onUpstreamFailure(ex)
GraphStageLogic.this.completedOrFailed(this)
} }
}.invoke _) }.invoke _)
@ -1513,6 +1521,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
available = false available = false
closed = true closed = true
handler.onDownstreamFinish(cause) handler.onDownstreamFinish(cause)
GraphStageLogic.this.completedOrFailed(this)
} }
} }