Align lazy and future operators #26446

This commit is contained in:
Johan Andrén 2019-10-16 17:02:12 +02:00 committed by GitHub
parent 4a72985e48
commit 74adecb4e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
59 changed files with 2091 additions and 384 deletions

View file

@ -2080,10 +2080,13 @@ private[stream] object Collect {
/**
* INTERNAL API
*/
@InternalApi final private[akka] class LazyFlow[I, O, M](flowFactory: I => Future[Flow[I, O, M]])
extends GraphStageWithMaterializedValue[FlowShape[I, O], Future[Option[M]]] {
val in = Inlet[I]("lazyFlow.in")
val out = Outlet[O]("lazyFlow.out")
@InternalApi private[akka] final class LazyFlow[I, O, M](flowFactory: I => Future[Flow[I, O, M]])
extends GraphStageWithMaterializedValue[FlowShape[I, O], Future[M]] {
// FIXME: when removing the deprecated I => Flow factories we can remove that complication from this stage
val in = Inlet[I]("LazyFlow.in")
val out = Outlet[O]("LazyFlow.out")
override def initialAttributes = DefaultAttributes.lazyFlow
@ -2091,78 +2094,84 @@ private[stream] object Collect {
override def toString: String = "LazyFlow"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val matPromise = Promise[Option[M]]()
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val matPromise = Promise[M]()
val stageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
var switching = false
//
// implementation of handler methods in initial state
//
override def onPush(): Unit = {
val element = grab(in)
switching = true
val cb = getAsyncCallback[Try[Flow[I, O, M]]] {
case Success(flow) =>
// check if the stage is still in need for the lazy flow
// (there could have been an onUpstreamFailure or onDownstreamFinish in the meantime that has completed the promise)
if (!matPromise.isCompleted) {
try {
val mat = switchTo(flow, element)
matPromise.success(Some(mat))
} catch {
case NonFatal(e) =>
matPromise.failure(e)
failStage(e)
}
private def onFlowFutureComplete(firstElement: I)(result: Try[Flow[I, O, M]]) = result match {
case Success(flow) =>
// check if the stage is still in need for the lazy flow
// (there could have been an onUpstreamFailure or onDownstreamFinish in the meantime that has completed the promise)
if (!matPromise.isCompleted) {
try {
val mat = switchTo(flow, firstElement)
matPromise.success(mat)
} catch {
case NonFatal(e) =>
matPromise.failure(e)
failStage(e)
}
case Failure(e) =>
matPromise.failure(e)
failStage(e)
}
}
case Failure(e) =>
matPromise.failure(e)
failStage(e)
}
override def onPush(): Unit =
try {
flowFactory(element).onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext)
val element = grab(in)
switching = true
val futureFlow = flowFactory(element)
// optimization avoid extra scheduling if already completed
futureFlow.value match {
case Some(completed) =>
onFlowFutureComplete(element)(completed)
case None =>
val cb = getAsyncCallback[Try[Flow[I, O, M]]](onFlowFutureComplete(element))
futureFlow.onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext)
}
} catch {
case NonFatal(e) =>
matPromise.failure(e)
failStage(e)
}
}
override def onUpstreamFinish(): Unit = {
if (!matPromise.isCompleted)
matPromise.tryFailure(new NeverMaterializedException)
// ignore onUpstreamFinish while the stage is switching but setKeepGoing
if (switching) {
setKeepGoing(true)
} else {
matPromise.success(None)
super.onUpstreamFinish()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
matPromise.failure(ex)
super.onUpstreamFailure(ex)
}
override def onDownstreamFinish(cause: Throwable): Unit = {
matPromise.success(None)
super.onDownstreamFinish(cause)
if (!matPromise.isCompleted)
matPromise.tryFailure(new NeverMaterializedException(ex))
}
override def onPull(): Unit = {
pull(in)
}
override def postStop(): Unit = {
if (!matPromise.isCompleted)
matPromise.tryFailure(new AbruptStageTerminationException(this))
}
setHandler(in, this)
setHandler(out, this)
private def switchTo(flow: Flow[I, O, M], firstElement: I): M = {
var firstElementPushed = false
//
// ports are wired in the following way:
//
@ -2174,6 +2183,7 @@ private[stream] object Collect {
val matVal = Source
.fromGraph(subOutlet.source)
.prepend(Source.single(firstElement))
.viaMat(flow)(Keep.right)
.toMat(subInlet.sink)(Keep.left)
.run()(interpreter.subFusingMaterializer)
@ -2203,10 +2213,8 @@ private[stream] object Collect {
subOutlet.push(grab(in))
}
override def onUpstreamFinish(): Unit = {
if (firstElementPushed) {
subOutlet.complete()
maybeCompleteStage()
}
subOutlet.complete()
maybeCompleteStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
// propagate exception irrespective if the cached element has been pushed or not
@ -2227,19 +2235,7 @@ private[stream] object Collect {
subOutlet.setHandler(new OutHandler {
override def onPull(): Unit = {
if (firstElementPushed) {
pull(in)
} else {
// the demand can be satisfied right away by the cached element
firstElementPushed = true
subOutlet.push(firstElement)
// in.onUpstreamFinished was not propagated if it arrived before the cached element was pushed
// -> check if the completion must be propagated now
if (isClosed(in)) {
subOutlet.complete()
maybeCompleteStage()
}
}
pull(in)
}
override def onDownstreamFinish(cause: Throwable): Unit = {
if (!isClosed(in)) {