diff --git a/akka-stream/src/main/mima-filters/2.6.11.backwards.excludes/29966-sub-inlet-outlet-memleak.backward.excludes b/akka-stream/src/main/mima-filters/2.6.11.backwards.excludes/29966-sub-inlet-outlet-memleak.backward.excludes new file mode 100644 index 0000000000..5e4ed2b7a2 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.11.backwards.excludes/29966-sub-inlet-outlet-memleak.backward.excludes @@ -0,0 +1,7 @@ +# small optimization of ConcurrentAsyncCallback internal state +ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$Event$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$State") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$Pending$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$Event") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$Initialized$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.GraphStageLogic$ConcurrentAsyncCallback$Pending") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index ce0989411a..5fe7550abf 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -272,6 +272,22 @@ object GraphStageLogic { private[stream] val NoPromise: Promise[Done] = Promise.successful(Done) } +/** + * INTERNAL API + */ +@InternalApi +private[akka] object ConcurrentAsyncCallbackState { + sealed trait State[+E] + // waiting for materialization completion or during dispatching of initially queued events + final case class Pending[E](pendingEvents: List[Event[E]]) extends State[E] + // stream is initialized and so no threads can just send events without any synchronization overhead + case object Initialized extends State[Nothing] + // Event with feedback promise + final case class Event[E](e: E, handlingPromise: Promise[Done]) + + val NoPendingEvents = Pending[Nothing](Nil) +} + /** * Represents the processing logic behind a [[GraphStage]]. Roughly speaking, a subclass of [[GraphStageLogic]] is a * collection of the following parts: @@ -1174,18 +1190,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * "Real world" calls of [[invokeWithFeedback]] always return failed promises for `Completed` state */ private final class ConcurrentAsyncCallback[T](handler: T => Unit) extends AsyncCallback[T] { - - sealed trait State - // waiting for materialization completion or during dispatching of initially queued events - // - can't be final because of SI-4440 - private case class Pending(pendingEvents: List[Event]) extends State - // stream is initialized and so no threads can just send events without any synchronization overhead - private case object Initialized extends State - // Event with feedback promise - can't be final because of SI-4440 - private case class Event(e: T, handlingPromise: Promise[Done]) - - private[this] val NoPendingEvents = Pending(Nil) - private[this] val currentState = new AtomicReference[State](NoPendingEvents) + import ConcurrentAsyncCallbackState._ + private[this] val currentState = new AtomicReference[State[T]](NoPendingEvents) // is called from the owning [[GraphStage]] @tailrec @@ -1240,7 +1246,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: case list @ Pending(l) => // not started yet - if (!currentState.compareAndSet(list, Pending(Event(event, promise) :: l))) + if (!currentState.compareAndSet(list, Pending[T](Event[T](event, promise) :: l))) invokeWithPromise(event, promise) } @@ -1435,9 +1441,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: case OnComplete => closed = true handler.onUpstreamFinish() + GraphStageLogic.this.completedOrFailed(this) case OnError(ex) => closed = true handler.onUpstreamFailure(ex) + GraphStageLogic.this.completedOrFailed(this) } }.invoke _) @@ -1513,6 +1521,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: available = false closed = true handler.onDownstreamFinish(cause) + GraphStageLogic.this.completedOrFailed(this) } }