Replace LazyFlow with FutureFlow (#28729)

And implement other lazy flows on top of it

Co-Authored-By: Johannes Rudolph <johannes.rudolph@gmail.com>
This commit is contained in:
eyal farago 2020-05-14 11:28:53 +03:00 committed by GitHub
parent 9a1d5191b9
commit de59bb6803
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 1435 additions and 739 deletions

View file

@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import scala.concurrent.{ Future, Promise }
import scala.concurrent.Future
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.util.{ Failure, Success, Try }
import scala.util.control.{ NoStackTrace, NonFatal }
@ -19,7 +19,6 @@ import com.github.ghik.silencer.silent
import akka.actor.{ ActorRef, Terminated }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.ExecutionContexts
import akka.event.{ LogMarker, LogSource, Logging, LoggingAdapter, MarkerLoggingAdapter }
import akka.event.Logging.LogLevel
import akka.stream.{ Supervision, _ }
@ -29,7 +28,7 @@ import akka.stream.OverflowStrategies._
import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.{ DelayStrategy, Flow, Keep, Source }
import akka.stream.scaladsl.{ DelayStrategy, Source }
import akka.stream.stage._
import akka.util.OptionVal
import akka.util.unused
@ -2224,199 +2223,3 @@ private[stream] object Collect {
override def toString = "StatefulMapConcat"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class LazyFlow[I, O, M](flowFactory: I => Future[Flow[I, O, M]])
extends GraphStageWithMaterializedValue[FlowShape[I, O], Future[M]] {
// FIXME: when removing the deprecated I => Flow factories we can remove that complication from this stage
val in = Inlet[I]("LazyFlow.in")
val out = Outlet[O]("LazyFlow.out")
override def initialAttributes = DefaultAttributes.lazyFlow
override val shape: FlowShape[I, O] = FlowShape.of(in, out)
override def toString: String = "LazyFlow"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val matPromise = Promise[M]()
val stageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
var switching = false
//
// implementation of handler methods in initial state
//
private def onFlowFutureComplete(firstElement: I)(result: Try[Flow[I, O, M]]) = result match {
case Success(flow) =>
// check if the stage is still in need for the lazy flow
// (there could have been an onUpstreamFailure or onDownstreamFinish in the meantime that has completed the promise)
if (!matPromise.isCompleted) {
try {
val mat = switchTo(flow, firstElement)
matPromise.success(mat)
} catch {
case NonFatal(e) =>
matPromise.failure(e)
failStage(e)
}
}
case Failure(e) =>
matPromise.failure(e)
failStage(e)
}
override def onPush(): Unit =
try {
val element = grab(in)
switching = true
val futureFlow = flowFactory(element)
// optimization avoid extra scheduling if already completed
futureFlow.value match {
case Some(completed) =>
onFlowFutureComplete(element)(completed)
case None =>
val cb = getAsyncCallback[Try[Flow[I, O, M]]](onFlowFutureComplete(element))
futureFlow.onComplete(cb.invoke)(ExecutionContexts.parasitic)
}
} catch {
case NonFatal(e) =>
matPromise.failure(e)
failStage(e)
}
override def onUpstreamFinish(): Unit = {
if (!matPromise.isCompleted)
matPromise.tryFailure(new NeverMaterializedException)
// ignore onUpstreamFinish while the stage is switching but setKeepGoing
if (switching) {
setKeepGoing(true)
} else {
super.onUpstreamFinish()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
super.onUpstreamFailure(ex)
if (!matPromise.isCompleted)
matPromise.tryFailure(new NeverMaterializedException(ex))
}
override def onPull(): Unit = {
pull(in)
}
override def postStop(): Unit = {
if (!matPromise.isCompleted)
matPromise.tryFailure(new AbruptStageTerminationException(this))
}
setHandler(in, this)
setHandler(out, this)
private def switchTo(flow: Flow[I, O, M], firstElement: I): M = {
//
// ports are wired in the following way:
//
// in ~> subOutlet ~> lazyFlow ~> subInlet ~> out
//
val subInlet = new SubSinkInlet[O]("LazyFlowSubSink")
val subOutlet = new SubSourceOutlet[I]("LazyFlowSubSource")
val matVal = Source
.fromGraph(subOutlet.source)
.prepend(Source.single(firstElement))
.viaMat(flow)(Keep.right)
.toMat(subInlet.sink)(Keep.left)
.run()(interpreter.subFusingMaterializer)
// The lazily materialized flow may be constructed from a sink and a source. Therefore termination
// signals (completion, cancellation, and errors) are not guaranteed to pass through the flow. This
// means that this stage must not be completed as soon as one side of the flow is finished.
//
// Invariant: isClosed(out) == subInlet.isClosed after each event because termination signals (i.e.
// completion, cancellation, and failure) between these two ports are always forwarded.
//
// However, isClosed(in) and subOutlet.isClosed may be different. This happens if upstream completes before
// the cached element was pushed.
def maybeCompleteStage(): Unit = {
if (isClosed(in) && subOutlet.isClosed && isClosed(out)) {
completeStage()
}
}
// The stage must not be shut down automatically; it is completed when maybeCompleteStage decides
setKeepGoing(true)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
subOutlet.push(grab(in))
}
override def onUpstreamFinish(): Unit = {
subOutlet.complete()
maybeCompleteStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
// propagate exception irrespective if the cached element has been pushed or not
subOutlet.fail(ex)
maybeCompleteStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
subInlet.pull()
}
override def onDownstreamFinish(cause: Throwable): Unit = {
subInlet.cancel(cause)
maybeCompleteStage()
}
})
subOutlet.setHandler(new OutHandler {
override def onPull(): Unit = {
pull(in)
}
override def onDownstreamFinish(cause: Throwable): Unit = {
if (!isClosed(in)) {
cancel(in, cause)
}
maybeCompleteStage()
}
})
subInlet.setHandler(new InHandler {
override def onPush(): Unit = {
push(out, subInlet.grab())
}
override def onUpstreamFinish(): Unit = {
complete(out)
maybeCompleteStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
fail(out, ex)
maybeCompleteStage()
}
})
if (isClosed(out)) {
// downstream may have been canceled while the stage was switching
subInlet.cancel()
} else {
subInlet.pull()
}
matVal
}
}
(stageLogic, matPromise.future)
}
}