+str Add Flow.lazyInit (#24427) (#24527)

This commit is contained in:
gosubpl 2018-02-22 08:11:31 +01:00 committed by Patrik Nordwall
parent 80da4cadee
commit edc67e0c3f
7 changed files with 445 additions and 2 deletions

View file

@ -6,20 +6,21 @@ package akka.stream.impl.fusing
import java.util.concurrent.TimeUnit.NANOSECONDS
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.ExecutionContexts
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ ConstantFun, ReactiveStreamsCompliance, Stages, Buffer BufferImpl }
import akka.stream.scaladsl.{ Source, SourceQueue }
import akka.stream.scaladsl.{ Flow, Keep, Source, SourceQueue }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import scala.concurrent.Future
import scala.concurrent.{ Future, Promise }
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes.SupervisionStrategy
@ -1927,3 +1928,152 @@ private[stream] object Collect {
override def toString = "StatefulMapConcat"
}
/**
* INTERNAL API
*/
@InternalApi final private[akka] class LazyFlow[I, O, M](flowFactory: I Future[Flow[I, O, M]], zeroMat: () M)
extends GraphStageWithMaterializedValue[FlowShape[I, O], M] {
val in = Inlet[I]("lazyFlow.in")
val out = Outlet[O]("lazyFlow.out")
override def initialAttributes = DefaultAttributes.lazyFlow
override val shape: FlowShape[I, O] = FlowShape.of(in, out)
override def toString: String = "LazyFlow"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var completed = false
var matVal: Option[M] = None
val stageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
val subSink = new SubSinkInlet[O]("LazyFlowSubSink")
override def onPush(): Unit = {
try {
val element = grab(in)
val cb: AsyncCallback[Try[Flow[I, O, M]]] =
getAsyncCallback {
case Success(flow) initInternalSource(flow, element)
case Failure(e) failure(e)
}
flowFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext)
setHandler(in, new InHandler {
override def onPush(): Unit = throw new IllegalStateException("LazyFlow received push while waiting for flowFactory to complete.")
override def onUpstreamFinish(): Unit = gotCompletionEvent()
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
})
} catch {
case NonFatal(e) decider(e) match {
case Supervision.Stop failure(e)
case _ pull(in)
}
}
}
override def onPull(): Unit = {
pull(in)
subSink.pull()
setHandler(out, new OutHandler {
override def onPull(): Unit = {
subSink.pull()
}
override def onDownstreamFinish(): Unit = {
subSink.cancel()
completeStage()
}
})
subSink.setHandler(new InHandler {
override def onPush(): Unit = {
val elem = subSink.grab()
push(out, elem)
}
override def onUpstreamFinish(): Unit = {
completeStage()
}
})
}
setHandler(out, this)
private def failure(ex: Throwable): Unit = {
matVal = Some(zeroMat())
failStage(ex)
}
override def onUpstreamFinish(): Unit = {
matVal = Some(zeroMat())
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
setHandler(in, this)
private def gotCompletionEvent(): Unit = {
setKeepGoing(true)
completed = true
}
private def initInternalSource(flow: Flow[I, O, M], firstElement: I): Unit = {
val sourceOut = new SubSourceOutlet[I]("LazyFlowSubSource")
def switchToFirstElementHandlers(): Unit = {
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = {
sourceOut.push(firstElement)
if (completed) internalSourceComplete() else switchToFinalHandlers()
}
override def onDownstreamFinish(): Unit = internalSourceComplete()
})
setHandler(in, new InHandler {
override def onPush(): Unit = sourceOut.push(grab(in))
override def onUpstreamFinish(): Unit = gotCompletionEvent()
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
})
}
def switchToFinalHandlers(): Unit = {
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = internalSourceComplete()
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
sourceOut.push(elem)
}
override def onUpstreamFinish(): Unit = internalSourceComplete()
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
})
}
def internalSourceComplete(): Unit = {
sourceOut.complete()
// normal completion, subSink.onUpstreamFinish will complete the stage
}
def internalSourceFailure(ex: Throwable): Unit = {
sourceOut.fail(ex)
failStage(ex)
}
switchToFirstElementHandlers()
try {
matVal = Some(Source.fromGraph(sourceOut.source)
.viaMat(flow)(Keep.right).toMat(subSink.sink)(Keep.left).run()(interpreter.subFusingMaterializer))
} catch {
case NonFatal(ex)
subSink.cancel()
matVal = Some(zeroMat())
failStage(ex)
}
}
}
(stageLogic, matVal.getOrElse(zeroMat()))
}
}