materialized value of Flow.lazyInit must be a Future #24670 (#24685)

* change materialized value of LazyFlow from [M] to Future[Option[M]]

* remove whitespace

* improve docu

* restore old Flow.lazyInit method and add new Flow.lazyInitAsync method

* fix deprecation messages

* add 2.5.11.backwards.excludes because of changed LazyFlow constructor signature

* check switching behaviour

* apply formatting

* improve deprecation message; improve null safety

* prevent premature stage completion by setting keepGoing

* deprecate Sink.lazyInit; add Sink.lazyInitAsync

* apply formatting

* add ProblemFilter.exclude for changed LazySink.this

* Update Sink.scala

* Update Sink.scala

* Update Flow.scala

* Update Flow.scala
This commit is contained in:
Stefan Wachter 2018-03-19 14:42:37 +01:00 committed by Konrad `ktoso` Malawski
parent 770b3a3474
commit 3db145643a
12 changed files with 454 additions and 383 deletions

View file

@ -22,7 +22,7 @@ import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import scala.concurrent.Future
import scala.concurrent.{ Future, Promise }
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes.SupervisionStrategy
@ -362,8 +362,7 @@ private[stream] object Collect {
override def toString: String = "Scan"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
self
new GraphStageLogic(shape) with InHandler with OutHandler { self
private var aggregator = zero
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
@ -425,8 +424,7 @@ private[stream] object Collect {
override val toString: String = "ScanAsync"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
self
new GraphStageLogic(shape) with InHandler with OutHandler { self
private var current: Out = zero
private var eventualCurrent: Future[Out] = Future.successful(current)
@ -1412,7 +1410,8 @@ private[stream] object Collect {
if (isEnabled(logLevels.onFailure))
logLevels.onFailure match {
case Logging.ErrorLevel log.error(cause, "[{}] Upstream failed.", name)
case level log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage)
case level log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause
.getClass), cause.getMessage)
}
super.onUpstreamFailure(cause)
@ -1463,7 +1462,8 @@ private[stream] object Collect {
private final val DefaultLoggerName = "akka.stream.Log"
private final val OffInt = LogLevels.Off.asInt
private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel)
private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging
.DebugLevel, onFailure = Logging.ErrorLevel)
}
/**
@ -1482,6 +1482,7 @@ private[stream] object Collect {
@InternalApi private[akka] object GroupedWeightedWithin {
val groupedWeightedWithinTimer = "GroupedWeightedWithinTimer"
}
/**
* INTERNAL API
*/
@ -1609,6 +1610,7 @@ private[stream] object Collect {
if (isAvailable(out)) emitGroup()
else pushEagerly = true
}
setHandlers(in, out, this)
}
}
@ -1792,8 +1794,7 @@ private[stream] object Collect {
@InternalApi private[akka] final class Reduce[T](val f: (T, T) T) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.reduce
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
self
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { self
override def toString = s"Reduce.Logic(aggregator=$aggregator)"
var aggregator: T = _
@ -1962,148 +1963,197 @@ private[stream] object Collect {
/**
* INTERNAL API
*/
@InternalApi final private[akka] class LazyFlow[I, O, M](flowFactory: I Future[Flow[I, O, M]], zeroMat: () M)
extends GraphStageWithMaterializedValue[FlowShape[I, O], M] {
@InternalApi final private[akka] class LazyFlow[I, O, M](flowFactory: I Future[Flow[I, O, M]])
extends GraphStageWithMaterializedValue[FlowShape[I, O], Future[Option[M]]] {
val in = Inlet[I]("lazyFlow.in")
val out = Outlet[O]("lazyFlow.out")
override def initialAttributes = DefaultAttributes.lazyFlow
override val shape: FlowShape[I, O] = FlowShape.of(in, out)
override def toString: String = "LazyFlow"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var completed = false
var matVal: Option[M] = None
val matPromise = Promise[Option[M]]()
val stageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
val subSink = new SubSinkInlet[O]("LazyFlowSubSink")
var switching = false
//
// implementation of handler methods in initial state
//
override def onPush(): Unit = {
try {
val element = grab(in)
val cb: AsyncCallback[Try[Flow[I, O, M]]] =
getAsyncCallback {
case Success(flow) initInternalSource(flow, element)
case Failure(e) failure(e)
val element = grab(in)
switching = true
val cb = getAsyncCallback[Try[Flow[I, O, M]]] {
case Success(flow)
// check if the stage is still in need for the lazy flow
// (there could have been an onUpstreamFailure or onDownstreamFinish in the meantime that has completed the promise)
if (!matPromise.isCompleted) {
try {
val mat = switchTo(flow, element)
matPromise.success(Some(mat))
} catch {
case NonFatal(e)
matPromise.failure(e)
failStage(e)
}
}
flowFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext)
setHandler(in, new InHandler {
override def onPush(): Unit = throw new IllegalStateException("LazyFlow received push while waiting for flowFactory to complete.")
override def onUpstreamFinish(): Unit = gotCompletionEvent()
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
})
} catch {
case NonFatal(e) decider(e) match {
case Supervision.Stop failure(e)
case _ pull(in)
}
case Failure(e)
matPromise.failure(e)
failStage(e)
}
try {
flowFactory(element).onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext)
} catch {
case NonFatal(e)
matPromise.failure(e)
failStage(e)
}
}
override def onUpstreamFinish(): Unit = {
// ignore onUpstreamFinish while the stage is switching but setKeepGoing
if (switching) {
setKeepGoing(true)
} else {
matPromise.success(None)
super.onUpstreamFinish()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
matPromise.failure(ex)
super.onUpstreamFailure(ex)
}
override def onDownstreamFinish(): Unit = {
matPromise.success(None)
super.onDownstreamFinish()
}
override def onPull(): Unit = {
pull(in)
subSink.pull()
}
setHandler(in, this)
setHandler(out, this)
private def switchTo(flow: Flow[I, O, M], firstElement: I): M = {
var firstElementPushed = false
//
// ports are wired in the following way:
//
// in ~> subOutlet ~> lazyFlow ~> subInlet ~> out
//
val subInlet = new SubSinkInlet[O]("LazyFlowSubSink")
val subOutlet = new SubSourceOutlet[I]("LazyFlowSubSource")
val matVal = Source.fromGraph(subOutlet.source)
.viaMat(flow)(Keep.right)
.toMat(subInlet.sink)(Keep.left)
.run()(interpreter.subFusingMaterializer)
// The lazily materialized flow may be constructed from a sink and a source. Therefore termination
// signals (completion, cancellation, and errors) are not guaranteed to pass through the flow. This
// means that this stage must not be completed as soon as one side of the flow is finished.
//
// Invariant: isClosed(out) == subInlet.isClosed after each event because termination signals (i.e.
// completion, cancellation, and failure) between these two ports are always forwarded.
//
// However, isClosed(in) and subOutlet.isClosed may be different. This happens if upstream completes before
// the cached element was pushed.
def maybeCompleteStage(): Unit = {
if (isClosed(in) && subOutlet.isClosed && isClosed(out)) {
completeStage()
}
}
// The stage must not be shut down automatically; it is completed when maybeCompleteStage decides
setKeepGoing(true)
setHandler(in, new InHandler {
override def onPush(): Unit = {
subOutlet.push(grab(in))
}
override def onUpstreamFinish(): Unit = {
if (firstElementPushed) {
subOutlet.complete()
maybeCompleteStage()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
// propagate exception irrespective if the cached element has been pushed or not
subOutlet.fail(ex)
maybeCompleteStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
subSink.pull()
subInlet.pull()
}
override def onDownstreamFinish(): Unit = {
subSink.cancel()
completeStage()
subInlet.cancel()
maybeCompleteStage()
}
})
subSink.setHandler(new InHandler {
subOutlet.setHandler(new OutHandler {
override def onPull(): Unit = {
if (firstElementPushed) {
pull(in)
} else {
// the demand can be satisfied right away by the cached element
firstElementPushed = true
subOutlet.push(firstElement)
// in.onUpstreamFinished was not propagated if it arrived before the cached element was pushed
// -> check if the completion must be propagated now
if (isClosed(in)) {
subOutlet.complete()
maybeCompleteStage()
}
}
}
override def onDownstreamFinish(): Unit = {
if (!isClosed(in)) {
cancel(in)
}
maybeCompleteStage()
}
})
subInlet.setHandler(new InHandler {
override def onPush(): Unit = {
val elem = subSink.grab()
push(out, elem)
push(out, subInlet.grab())
}
override def onUpstreamFinish(): Unit = {
completeStage()
complete(out)
maybeCompleteStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
fail(out, ex)
maybeCompleteStage()
}
})
}
setHandler(out, this)
private def failure(ex: Throwable): Unit = {
matVal = Some(zeroMat())
failStage(ex)
}
override def onUpstreamFinish(): Unit = {
matVal = Some(zeroMat())
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
setHandler(in, this)
private def gotCompletionEvent(): Unit = {
setKeepGoing(true)
completed = true
}
private def initInternalSource(flow: Flow[I, O, M], firstElement: I): Unit = {
val sourceOut = new SubSourceOutlet[I]("LazyFlowSubSource")
def switchToFirstElementHandlers(): Unit = {
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = {
sourceOut.push(firstElement)
if (completed) internalSourceComplete() else switchToFinalHandlers()
}
override def onDownstreamFinish(): Unit = internalSourceComplete()
})
setHandler(in, new InHandler {
override def onPush(): Unit = sourceOut.push(grab(in))
override def onUpstreamFinish(): Unit = gotCompletionEvent()
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
})
if (isClosed(out)) {
// downstream may have been canceled while the stage was switching
subInlet.cancel()
} else {
subInlet.pull()
}
def switchToFinalHandlers(): Unit = {
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = internalSourceComplete()
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
sourceOut.push(elem)
}
override def onUpstreamFinish(): Unit = internalSourceComplete()
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
})
}
def internalSourceComplete(): Unit = {
sourceOut.complete()
// normal completion, subSink.onUpstreamFinish will complete the stage
}
def internalSourceFailure(ex: Throwable): Unit = {
sourceOut.fail(ex)
failStage(ex)
}
switchToFirstElementHandlers()
try {
matVal = Some(Source.fromGraph(sourceOut.source)
.viaMat(flow)(Keep.right).toMat(subSink.sink)(Keep.left).run()(interpreter.subFusingMaterializer))
} catch {
case NonFatal(ex)
subSink.cancel()
matVal = Some(zeroMat())
failStage(ex)
}
matVal
}
}
(stageLogic, matVal.getOrElse(zeroMat()))
(stageLogic, matPromise.future)
}
}