diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index aba8c7f680..30b89669d7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -498,43 +498,39 @@ import pekko.util.ccompat.JavaConverters._ override def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda(p) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - import Split._ + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { parent => + import Split._ - private val SubscriptionTimer = "SubstreamSubscriptionTimer" + private val SubscriptionTimer = "SubstreamSubscriptionTimer" - private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider - private val timeout: FiniteDuration = - inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout - private var substreamSource: SubSourceOutlet[T] = null - private var substreamWaitingToBePushed = false - private var substreamCancelled = false + private val timeout: FiniteDuration = + inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout + private var substreamSource: SubSourceOutlet[T] = null + private var substreamWaitingToBePushed = false + private var substreamCancelled = false - def propagateSubstreamCancel(ex: Throwable): Boolean = - decider(ex) match { - case Supervision.Stop => true - case Supervision.Resume => false - case Supervision.Restart => false + def propagateSubstreamCancel(ex: Throwable): Boolean = + decider(ex) match { + case Supervision.Stop => true + case Supervision.Resume => false + case Supervision.Restart => false + } + + override def onPull(): Unit = { + if (substreamSource eq null) { + // can be already pulled from substream in case split after + if (!hasBeenPulled(in)) pull(in) + } else if (substreamWaitingToBePushed) pushSubstreamSource() } - setHandler( - out, - new OutHandler { - override def onPull(): Unit = { - if (substreamSource eq null) { - // can be already pulled from substream in case split after - if (!hasBeenPulled(in)) pull(in) - } else if (substreamWaitingToBePushed) pushSubstreamSource() - } + override def onDownstreamFinish(cause: Throwable): Unit = { + // If the substream is already cancelled or it has not been handed out, we can go away + if ((substreamSource eq null) || substreamWaitingToBePushed || substreamCancelled) cancelStage(cause) + } - override def onDownstreamFinish(cause: Throwable): Unit = { - // If the substream is already cancelled or it has not been handed out, we can go away - if ((substreamSource eq null) || substreamWaitingToBePushed || substreamCancelled) cancelStage(cause) - } - }) - - val initInHandler = new InHandler { override def onPush(): Unit = { val handler = new SubstreamHandler val elem = grab(in) @@ -549,115 +545,115 @@ import pekko.util.ccompat.JavaConverters._ handOver(handler) } + override def onUpstreamFinish(): Unit = completeStage() - } - // initial input handler - setHandler(in, initInHandler) + // initial input handler + setHandlers(in, out, this) - private def handOver(handler: SubstreamHandler): Unit = { - if (isClosed(out)) completeStage() - else { - substreamSource = new SubSourceOutlet[T]("SplitSource") - substreamSource.setHandler(handler) - substreamCancelled = false - setHandler(in, handler) - setKeepGoing(enabled = handler.hasInitialElement) + private def handOver(handler: SubstreamHandler): Unit = { + if (isClosed(out)) completeStage() + else { + substreamSource = new SubSourceOutlet[T]("SplitSource") + substreamSource.setHandler(handler) + substreamCancelled = false + setHandler(in, handler) + setKeepGoing(enabled = handler.hasInitialElement) - if (isAvailable(out)) { - if (decision == SplitBefore || handler.hasInitialElement) pushSubstreamSource() else pull(in) - } else substreamWaitingToBePushed = true - } - } - - private def pushSubstreamSource(): Unit = { - push(out, Source.fromGraph(substreamSource.source)) - scheduleOnce(SubscriptionTimer, timeout) - substreamWaitingToBePushed = false - } - - override protected def onTimer(timerKey: Any): Unit = substreamSource.timeout(timeout) - - private class SubstreamHandler extends InHandler with OutHandler { - - var firstElem: T = null.asInstanceOf[T] - - def hasInitialElement: Boolean = firstElem.asInstanceOf[AnyRef] ne null - private var willCompleteAfterInitialElement = false - - // Substreams are always assumed to be pushable position when we enter this method - private def closeThis(handler: SubstreamHandler, currentElem: T): Unit = { - decision match { - case SplitAfter => - if (!substreamCancelled) { - substreamSource.push(currentElem) - substreamSource.complete() - } - case SplitBefore => - handler.firstElem = currentElem - if (!substreamCancelled) substreamSource.complete() + if (isAvailable(out)) { + if (decision == SplitBefore || handler.hasInitialElement) pushSubstreamSource() else pull(in) + } else substreamWaitingToBePushed = true } } - override def onPull(): Unit = { - cancelTimer(SubscriptionTimer) - if (hasInitialElement) { - substreamSource.push(firstElem) - firstElem = null.asInstanceOf[T] - setKeepGoing(false) - if (willCompleteAfterInitialElement) { + private def pushSubstreamSource(): Unit = { + push(out, Source.fromGraph(substreamSource.source)) + scheduleOnce(SubscriptionTimer, timeout) + substreamWaitingToBePushed = false + } + + override protected def onTimer(timerKey: Any): Unit = substreamSource.timeout(timeout) + + private class SubstreamHandler extends InHandler with OutHandler { + + var firstElem: T = null.asInstanceOf[T] + + def hasInitialElement: Boolean = firstElem.asInstanceOf[AnyRef] ne null + private var willCompleteAfterInitialElement = false + + // Substreams are always assumed to be pushable position when we enter this method + private def closeThis(handler: SubstreamHandler, currentElem: T): Unit = { + decision match { + case SplitAfter => + if (!substreamCancelled) { + substreamSource.push(currentElem) + substreamSource.complete() + } + case SplitBefore => + handler.firstElem = currentElem + if (!substreamCancelled) substreamSource.complete() + } + } + + override def onPull(): Unit = { + cancelTimer(SubscriptionTimer) + if (hasInitialElement) { + substreamSource.push(firstElem) + firstElem = null.asInstanceOf[T] + setKeepGoing(false) + if (willCompleteAfterInitialElement) { + substreamSource.complete() + completeStage() + } + } else pull(in) + } + + override def onDownstreamFinish(cause: Throwable): Unit = { + substreamCancelled = true + if (isClosed(in) || propagateSubstreamCancel(cause)) { + cancelStage(cause) + } else { + // Start draining + if (!hasBeenPulled(in)) pull(in) + } + } + + override def onPush(): Unit = { + val elem = grab(in) + try { + if (p(elem)) { + val handler = new SubstreamHandler + closeThis(handler, elem) + if (decision == SplitBefore) handOver(handler) + else { + substreamSource = null + setHandler(in, parent) + pull(in) + } + } else { + // Drain into the void + if (substreamCancelled) pull(in) + else substreamSource.push(elem) + } + } catch { + case NonFatal(ex) => onUpstreamFailure(ex) + } + } + + override def onUpstreamFinish(): Unit = + if (hasInitialElement) willCompleteAfterInitialElement = true + else { substreamSource.complete() completeStage() } - } else pull(in) - } - override def onDownstreamFinish(cause: Throwable): Unit = { - substreamCancelled = true - if (isClosed(in) || propagateSubstreamCancel(cause)) { - cancelStage(cause) - } else { - // Start draining - if (!hasBeenPulled(in)) pull(in) - } - } - - override def onPush(): Unit = { - val elem = grab(in) - try { - if (p(elem)) { - val handler = new SubstreamHandler - closeThis(handler, elem) - if (decision == SplitBefore) handOver(handler) - else { - substreamSource = null - setHandler(in, initInHandler) - pull(in) - } - } else { - // Drain into the void - if (substreamCancelled) pull(in) - else substreamSource.push(elem) - } - } catch { - case NonFatal(ex) => onUpstreamFailure(ex) - } - } - - override def onUpstreamFinish(): Unit = - if (hasInitialElement) willCompleteAfterInitialElement = true - else { - substreamSource.complete() - completeStage() + override def onUpstreamFailure(ex: Throwable): Unit = { + substreamSource.fail(ex) + failStage(ex) } - override def onUpstreamFailure(ex: Throwable): Unit = { - substreamSource.fail(ex) - failStage(ex) } - } - } override def toString: String = "Split" }