+str #17290 Adds preStart / postStop to AbstractStage

+ AsyncStage now uses preStart
+ Log stage now uses preStart (better perf), can be optimised more
This commit is contained in:
Konrad Malawski 2015-05-11 00:09:59 +02:00
parent f24f43895d
commit 9607ec0cdf
10 changed files with 437 additions and 69 deletions

View file

@ -0,0 +1,121 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicBoolean
import akka.stream.Supervision._
import akka.stream._
import akka.stream.impl.fusing.{ InterpreterLifecycleSpecKit, ActorInterpreter }
import akka.stream.stage.Stage
import akka.stream.testkit.Utils.TE
import akka.stream.testkit.{ AkkaSpec, _ }
import scala.concurrent.duration._
class ActorInterpreterLifecycleSpec extends AkkaSpec with InterpreterLifecycleSpecKit {
implicit val mat = ActorFlowMaterializer()
class Setup(ops: List[Stage[_, _]] = List(fusing.Map({ x: Any x }, stoppingDecider))) {
val up = TestPublisher.manualProbe[Int]
val down = TestSubscriber.manualProbe[Int]
private val props = ActorInterpreter.props(mat.settings, ops, mat).withDispatcher("akka.test.stream-dispatcher")
val actor = system.actorOf(props)
val processor = ActorProcessorFactory[Int, Int](actor)
}
"An ActorInterpreter" must {
"call preStart in order on stages" in new Setup(List(
PreStartAndPostStopIdentity(onStart = _ testActor ! "start-a"),
PreStartAndPostStopIdentity(onStart = _ testActor ! "start-b"),
PreStartAndPostStopIdentity(onStart = _ testActor ! "start-c"))) {
processor.subscribe(down)
val sub = down.expectSubscription()
sub.cancel()
up.subscribe(processor)
val upsub = up.expectSubscription()
upsub.expectCancellation()
expectMsg("start-a")
expectMsg("start-b")
expectMsg("start-c")
}
"call postStart in order on stages - when upstream completes" in new Setup(List(
PreStartAndPostStopIdentity(onStop = () testActor ! "stop-a"),
PreStartAndPostStopIdentity(onStop = () testActor ! "stop-b"),
PreStartAndPostStopIdentity(onStop = () testActor ! "stop-c"))) {
processor.subscribe(down)
val sub = down.expectSubscription()
up.subscribe(processor)
val upsub = up.expectSubscription()
upsub.sendComplete()
down.expectComplete()
expectMsg("stop-a")
expectMsg("stop-b")
expectMsg("stop-c")
}
"call postStart in order on stages - when downstream cancels" in new Setup(List(
PreStartAndPostStopIdentity(onStop = () testActor ! "stop-a"),
PreStartAndPostStopIdentity(onStop = () testActor ! "stop-b"),
PreStartAndPostStopIdentity(onStop = () testActor ! "stop-c"))) {
processor.subscribe(down)
val sub = down.expectSubscription()
sub.cancel()
up.subscribe(processor)
val upsub = up.expectSubscription()
upsub.expectCancellation()
expectMsg("stop-c")
expectMsg("stop-b")
expectMsg("stop-a")
}
"onError downstream when preStart fails" in new Setup(List(
PreStartFailer(() throw TE("Boom!")))) {
processor.subscribe(down)
val sub = down.expectSubscription()
up.subscribe(processor)
val upsub = up.expectSubscription()
down.expectError(TE("Boom!"))
}
"onError only once even with Supervision.restart" in new Setup(List(
PreStartFailer(() throw TE("Boom!")))) {
processor.subscribe(down)
val sub = down.expectSubscription()
up.subscribe(processor)
val upsub = up.expectSubscription()
down.expectError(TE("Boom!"))
down.expectNoMsg(1.second)
}
"onError downstream when preStart fails with 'most downstream' failure, when multiple stages fail" in new Setup(List(
PreStartFailer(() throw TE("Boom 1!")),
PreStartFailer(() throw TE("Boom 2!")),
PreStartFailer(() throw TE("Boom 3!")))) {
processor.subscribe(down)
val sub = down.expectSubscription()
up.subscribe(processor)
val upsub = up.expectSubscription()
down.expectError(TE("Boom 3!"))
down.expectNoMsg(300.millis)
}
"continue with stream shutdown when postStop fails" in new Setup(List(
PostStopFailer(() throw TE("Boom!")))) {
processor.subscribe(down)
val sub = down.expectSubscription()
up.subscribe(processor)
val upsub = up.expectSubscription()
upsub.sendComplete()
down.expectComplete() // failures in postStop are logged, but not propagated // TODO Future features? make this a setting?
}
}
}

View file

@ -3,15 +3,10 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import akka.stream.stage.{ TerminationDirective, SyncDirective, Context, PushStage }
import scala.util.control.NoStackTrace
import akka.stream.Supervision import akka.stream.Supervision
class InterpreterSpec extends InterpreterSpecKit { class InterpreterSpec extends InterpreterSpecKit {
import Supervision.stoppingDecider import Supervision.stoppingDecider
import Supervision.resumingDecider
import Supervision.restartingDecider
"Interpreter" must { "Interpreter" must {
@ -462,13 +457,6 @@ class InterpreterSpec extends InterpreterSpecKit {
} }
// This test is related to issue #17351
class PushFinishStage extends PushStage[Any, Any] {
override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = ctx.pushAndFinish(elem)
override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective =
ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen"))
}
"work with pushAndFinish if upstream completes with pushAndFinish" in new TestSetup(Seq( "work with pushAndFinish if upstream completes with pushAndFinish" in new TestSetup(Seq(
new PushFinishStage)) { new PushFinishStage)) {

View file

@ -3,13 +3,64 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import akka.stream.OperationAttributes import akka.event.Logging
import akka.stream.testkit.AkkaSpec
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.testkit.AkkaSpec
import akka.stream.{ ActorFlowMaterializer, OperationAttributes }
import akka.testkit.TestProbe import akka.testkit.TestProbe
import akka.stream.ActorFlowMaterializer
trait InterpreterSpecKit extends AkkaSpec { trait InterpreterLifecycleSpecKit {
private[akka] case class PreStartAndPostStopIdentity[T](
onStart: LifecycleContext Unit = _ (),
onStop: () Unit = () (),
onUpstreamCompleted: () Unit = () (),
onUpstreamFailed: Throwable Unit = ex ())
extends PushStage[T, T] {
override def preStart(ctx: Context[T]) = onStart(ctx)
override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem)
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
onUpstreamCompleted()
super.onUpstreamFinish(ctx)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
onUpstreamFailed(cause)
super.onUpstreamFailure(cause, ctx)
}
override def postStop() = onStop()
}
private[akka] case class PreStartFailer[T](pleaseThrow: () Unit) extends PushStage[T, T] {
override def preStart(ctx: Context[T]) = pleaseThrow()
override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem)
}
private[akka] case class PostStopFailer[T](ex: () Throwable) extends PushStage[T, T] {
override def onUpstreamFinish(ctx: Context[T]) = ctx.finish()
override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem)
override def postStop(): Unit = throw ex()
}
// This test is related to issue #17351
private[akka] class PushFinishStage(onPostStop: () Unit = () ()) extends PushStage[Any, Any] {
override def onPush(elem: Any, ctx: Context[Any]): SyncDirective =
ctx.pushAndFinish(elem)
override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective =
ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen"))
override def postStop(): Unit =
onPostStop()
}
}
trait InterpreterSpecKit extends AkkaSpec with InterpreterLifecycleSpecKit {
case object OnComplete case object OnComplete
case object Cancel case object Cancel
@ -61,6 +112,7 @@ trait InterpreterSpecKit extends AkkaSpec {
val sidechannel = TestProbe() val sidechannel = TestProbe()
val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream, val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream,
(op, ctx, event) sidechannel.ref ! ActorInterpreter.AsyncInput(op, ctx, event), (op, ctx, event) sidechannel.ref ! ActorInterpreter.AsyncInput(op, ctx, event),
Logging(system, classOf[TestSetup]),
ActorFlowMaterializer(), ActorFlowMaterializer(),
OperationAttributes.none, OperationAttributes.none,
forkLimit, overflowToHeap) forkLimit, overflowToHeap)

View file

@ -0,0 +1,141 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.fusing
import akka.stream.testkit.Utils.TE
import scala.concurrent.duration._
class LifecycleInterpreterSpec extends InterpreterSpecKit {
import akka.stream.Supervision._
"Interpreter" must {
"call preStart in order on stages" in new TestSetup(Seq(
PreStartAndPostStopIdentity(onStart = _ testActor ! "start-a"),
PreStartAndPostStopIdentity(onStart = _ testActor ! "start-b"),
PreStartAndPostStopIdentity(onStart = _ testActor ! "start-c"))) {
expectMsg("start-a")
expectMsg("start-b")
expectMsg("start-c")
expectNoMsg(300.millis)
upstream.onComplete()
}
"call postStop in order on stages - when upstream completes" in new TestSetup(Seq(
PreStartAndPostStopIdentity(onUpstreamCompleted = () testActor ! "complete-a", onStop = () testActor ! "stop-a"),
PreStartAndPostStopIdentity(onUpstreamCompleted = () testActor ! "complete-b", onStop = () testActor ! "stop-b"),
PreStartAndPostStopIdentity(onUpstreamCompleted = () testActor ! "complete-c", onStop = () testActor ! "stop-c"))) {
upstream.onComplete()
expectMsg("complete-a")
expectMsg("stop-a")
expectMsg("complete-b")
expectMsg("stop-b")
expectMsg("complete-c")
expectMsg("stop-c")
expectNoMsg(300.millis)
}
"call postStop in order on stages - when upstream onErrors" in new TestSetup(Seq(
PreStartAndPostStopIdentity(
onUpstreamFailed = ex testActor ! ex.getMessage,
onStop = () testActor ! "stop-c"))) {
val msg = "Boom! Boom! Boom!"
upstream.onError(TE(msg))
expectMsg(msg)
expectMsg("stop-c")
expectNoMsg(300.millis)
}
"call postStop in order on stages - when downstream cancels" in new TestSetup(Seq(
PreStartAndPostStopIdentity(onStop = () testActor ! "stop-a"),
PreStartAndPostStopIdentity(onStop = () testActor ! "stop-b"),
PreStartAndPostStopIdentity(onStop = () testActor ! "stop-c"))) {
downstream.cancel()
expectMsg("stop-c")
expectMsg("stop-b")
expectMsg("stop-a")
expectNoMsg(300.millis)
}
"call preStart before postStop" in new TestSetup(Seq(
PreStartAndPostStopIdentity(onStart = _ testActor ! "start-a", onStop = () testActor ! "stop-a"))) {
expectMsg("start-a")
expectNoMsg(300.millis)
upstream.onComplete()
expectMsg("stop-a")
expectNoMsg(300.millis)
}
"onError when preStart fails" in new TestSetup(Seq(
PreStartFailer(() throw TE("Boom!")))) {
lastEvents() should ===(Set(Cancel, OnError(TE("Boom!"))))
}
"not blow up when postStop fails" in new TestSetup(Seq(
PostStopFailer(() throw TE("Boom!")))) {
upstream.onComplete()
lastEvents() should ===(Set(OnComplete))
}
"onError when preStart fails with stages after" in new TestSetup(Seq(
Map((x: Int) x, stoppingDecider),
PreStartFailer(() throw TE("Boom!")),
Map((x: Int) x, stoppingDecider))) {
lastEvents() should ===(Set(Cancel, OnError(TE("Boom!"))))
}
"continue with stream shutdown when postStop fails" in new TestSetup(Seq(
PostStopFailer(() throw TE("Boom!")))) {
lastEvents() should ===(Set())
upstream.onComplete()
lastEvents should ===(Set(OnComplete))
}
"postStop when pushAndFinish called if upstream completes with pushAndFinish" in new TestSetup(Seq(
new PushFinishStage(onPostStop = () testActor ! "stop"))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNextAndComplete("foo")
lastEvents() should be(Set(OnNext("foo"), OnComplete))
expectMsg("stop")
}
"postStop when pushAndFinish called with pushAndFinish if indirect upstream completes with pushAndFinish" in new TestSetup(Seq(
Map((x: Any) x, stoppingDecider),
new PushFinishStage(onPostStop = () testActor ! "stop"),
Map((x: Any) x, stoppingDecider))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNextAndComplete("foo")
lastEvents() should be(Set(OnNext("foo"), OnComplete))
expectMsg("stop")
}
"postStop when pushAndFinish called with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new TestSetup(Seq(
new PushFinishStage(onPostStop = () testActor ! "stop"),
Fold("", (x: String, y: String) x + y, stoppingDecider))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNextAndComplete("foo")
lastEvents() should be(Set(OnNext("foo"), OnComplete))
expectMsg("stop")
}
}
}

View file

@ -59,7 +59,7 @@ object TlsSpec {
class Timeout(duration: FiniteDuration)(implicit system: ActorSystem) extends AsyncStage[ByteString, ByteString, Unit] { class Timeout(duration: FiniteDuration)(implicit system: ActorSystem) extends AsyncStage[ByteString, ByteString, Unit] {
private var last: ByteString = _ private var last: ByteString = _
override def initAsyncInput(ctx: AsyncContext[ByteString, Unit]) = { override def preStart(ctx: AsyncContext[ByteString, Unit]) = {
val cb = ctx.getAsyncCallback() val cb = ctx.getAsyncCallback()
system.scheduler.scheduleOnce(duration)(cb.invoke(()))(system.dispatcher) system.scheduler.scheduleOnce(duration)(cb.invoke(()))(system.dispatcher)
} }

View file

@ -4,18 +4,16 @@
package akka.stream.impl.fusing package akka.stream.impl.fusing
import java.util.Arrays import java.util.Arrays
import akka.actor.{ Actor, ActorRef } import akka.actor._
import akka.stream.ActorFlowMaterializerSettings import akka.stream.ActorFlowMaterializerSettings
import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete }
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.OperationAttributes import akka.stream.OperationAttributes
import akka.stream.impl.fusing.OneBoundedInterpreter.{ InitializationFailed, InitializationFailure, InitializationSuccessful }
import akka.stream.stage._ import akka.stream.stage._
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
import akka.actor.Props
import akka.actor.ActorLogging
import akka.event.{ Logging, LoggingAdapter } import akka.event.{ Logging, LoggingAdapter }
import akka.actor.DeadLetterSuppression
import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializer
/** /**
@ -181,6 +179,8 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef,
private var downstreamCompleted = false private var downstreamCompleted = false
// this is true while we hold the ball; while false incoming demand will just be queued up // this is true while we hold the ball; while false incoming demand will just be queued up
private var upstreamWaiting = true private var upstreamWaiting = true
// when upstream failed before we got the exposed publisher
private var upstreamFailed: Option[Throwable] = None
// the number of elements emitted during a single execution is bounded // the number of elements emitted during a single execution is bounded
private var burstRemaining = outputBurstLimit private var burstRemaining = outputBurstLimit
@ -227,6 +227,9 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef,
log.debug("fail due to: {}", e.getMessage) log.debug("fail due to: {}", e.getMessage)
if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e) if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e)
} else if (exposedPublisher == null && upstreamFailed.isEmpty) {
// fail called before the exposed publisher arrived, we must store it and fail when we're first able to
upstreamFailed = Some(e)
} }
} }
@ -261,8 +264,13 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef,
protected def waitingExposedPublisher: Actor.Receive = { protected def waitingExposedPublisher: Actor.Receive = {
case ExposedPublisher(publisher) case ExposedPublisher(publisher)
exposedPublisher = publisher upstreamFailed match {
subreceive.become(downstreamRunning) case _: Some[_]
publisher.shutdown(upstreamFailed)
case _
exposedPublisher = publisher
subreceive.become(downstreamRunning)
}
case other case other
throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]") throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]")
} }
@ -319,11 +327,18 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings
private val interpreter = private val interpreter =
new OneBoundedInterpreter(upstream +: ops :+ downstream, new OneBoundedInterpreter(upstream +: ops :+ downstream,
(op, ctx, event) self ! AsyncInput(op, ctx, event), (op, ctx, event) self ! AsyncInput(op, ctx, event),
Logging(this),
materializer, materializer,
attributes, attributes,
name = context.self.path.toString) name = context.self.path.toString)
interpreter.init() interpreter.init() match {
case failed: InitializationFailed
// the Actor will be stopped thanks to aroundReceive checking interpreter.isFinished
upstream.setDownstreamCanceled()
downstream.fail(failed.mostDownstream.ex)
case InitializationSuccessful // ok
}
def receive: Receive = def receive: Receive =
upstream.subreceive upstream.subreceive

View file

@ -3,13 +3,13 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import akka.stream.{ FlowMaterializer, Supervision } import akka.event.LoggingAdapter
import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.OperationAttributes
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.{ FlowMaterializer, OperationAttributes, Supervision }
import scala.annotation.{ switch, tailrec } import scala.annotation.{ switch, tailrec }
import scala.collection.breakOut import scala.collection.{ breakOut, immutable }
import scala.util.control.NonFatal import scala.util.control.NonFatal
/** /**
@ -18,6 +18,20 @@ import scala.util.control.NonFatal
private[akka] object OneBoundedInterpreter { private[akka] object OneBoundedInterpreter {
final val Debug = false final val Debug = false
/** INTERNAL API */
private[akka] sealed trait InitializationStatus
/** INTERNAL API */
private[akka] final case object InitializationSuccessful extends InitializationStatus
/** INTERNAL API */
private[akka] final case class InitializationFailed(failures: immutable.Seq[InitializationFailure]) extends InitializationStatus {
// exceptions are reverse ordered here, below methods help to avoid confusion when used from the outside
def mostUpstream = failures.last
def mostDownstream = failures.head
}
/** INTERNAL API */
private[akka] case class InitializationFailure(op: Int, ex: Throwable)
/** /**
* INTERNAL API * INTERNAL API
* *
@ -138,6 +152,7 @@ private[akka] object OneBoundedInterpreter {
*/ */
private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
onAsyncInput: (AsyncStage[Any, Any, Any], AsyncContext[Any, Any], Any) Unit, onAsyncInput: (AsyncStage[Any, Any, Any], AsyncContext[Any, Any], Any) Unit,
log: LoggingAdapter,
materializer: FlowMaterializer, materializer: FlowMaterializer,
attributes: OperationAttributes = OperationAttributes.none, attributes: OperationAttributes = OperationAttributes.none,
val forkLimit: Int = 100, val forkLimit: Int = 100,
@ -312,7 +327,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
def isFinishing: Boolean = hasBits(TerminationPending) def isFinishing: Boolean = hasBits(TerminationPending)
final protected def pushAndFinishCommon(elem: Any): Unit = { final protected def pushAndFinishCommon(elem: Any, finishState: UntypedOp): Unit = {
finishCurrentOp(finishState)
ReactiveStreamsCompliance.requireNonNullElement(elem) ReactiveStreamsCompliance.requireNonNullElement(elem)
if (currentOp.isDetached) { if (currentOp.isDetached) {
mustHave(DownstreamBall) mustHave(DownstreamBall)
@ -321,9 +337,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
} }
override def pushAndFinish(elem: Any): DownstreamDirective = { override def pushAndFinish(elem: Any): DownstreamDirective = {
pushAndFinishCommon(elem) // Spit the execution domain in two and invoke postStop
// Spit the execution domain in two, and invoke op postStop callbacks if there are any pushAndFinishCommon(elem, Finished.asInstanceOf[UntypedOp])
finishCurrentOp()
// This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution // This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution
// path. Other forks are not order dependent because they execute on isolated execution domains which cannot // path. Other forks are not order dependent because they execute on isolated execution domains which cannot
@ -422,11 +437,12 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this) override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this)
override def pushAndFinish(elem: Any): DownstreamDirective = { override def pushAndFinish(elem: Any): DownstreamDirective = {
pushAndFinishCommon(elem) // PushFinished
// Put an isolation barrier that will prevent the onPull of this op to be called again. This barrier // Put an isolation barrier that will prevent the onPull of this op to be called again. This barrier
// is different from simple Finished that it allows onUpstreamTerminated to pass through, unless onPull // is different from simple Finished that it allows onUpstreamTerminated to pass through, unless onPull
// has been called on the stage // has been called on the stage
pipeline(activeOpIndex) = PushFinished.asInstanceOf[UntypedOp] pushAndFinishCommon(elem, PushFinished.asInstanceOf[UntypedOp])
elementInFlight = elem elementInFlight = elem
state = PushFinish state = PushFinish
null null
@ -623,9 +639,20 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
activeOpIndex = savePos activeOpIndex = savePos
} }
def init(): Unit = { /**
initBoundaries() * Initializes all stages setting their initial context and calling [[AbstractStage.preStart]] on each.
*/
def init(): InitializationStatus = {
val failures = initBoundaries()
runDetached() runDetached()
if (failures.isEmpty) InitializationSuccessful
else {
val failure = failures.head
activeOpIndex = failure.op
currentOp.enterAndFail(failure.ex)
InitializationFailed(failures)
}
} }
def isFinished: Boolean = pipeline(Upstream) == Finished && pipeline(Downstream) == Finished def isFinished: Boolean = pipeline(Upstream) == Finished && pipeline(Downstream) == Finished
@ -652,7 +679,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
/** /**
* This method injects a Context to each of the BoundaryStages and AsyncStages. This will be the context returned by enter(). * This method injects a Context to each of the BoundaryStages and AsyncStages. This will be the context returned by enter().
*/ */
private def initBoundaries(): Unit = { private def initBoundaries(): List[InitializationFailure] = {
var failures: List[InitializationFailure] = Nil
var op = 0 var op = 0
while (op < pipeline.length) { while (op < pipeline.length) {
(pipeline(op): Any) match { (pipeline(op): Any) match {
@ -662,16 +690,26 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
case a: AsyncStage[Any, Any, Any] @unchecked case a: AsyncStage[Any, Any, Any] @unchecked
a.context = new EntryState("async", op) a.context = new EntryState("async", op)
activeOpIndex = op activeOpIndex = op
a.initAsyncInput(a.context) // TODO remove asyncInput? it's like preStart a.preStart(a.context)
case _ case a: AbstractStage[Any, Any, Any, Any, Any] @unchecked
val state = new EntryState("stage", op)
a.context = state
try a.preStart(state) catch {
case NonFatal(ex)
failures ::= InitializationFailure(op, ex) // not logging here as 'most downstream' exception will be signalled via onError
// TODO could use decider here, but semantics become a bit iffy (Resume => ignore error in prestart? Doesn't sound like a good idea).
}
} }
op += 1 op += 1
} }
failures
} }
private def finishCurrentOp(): Unit = { private def finishCurrentOp(finishState: UntypedOp = Finished.asInstanceOf[UntypedOp]): Unit = {
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] try pipeline(activeOpIndex).postStop()
catch { case NonFatal(ex) log.error(s"Stage [{}] postStop failed", ex) }
finally pipeline(activeOpIndex) = finishState
} }
/** /**

View file

@ -3,8 +3,9 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import akka.stream.stage._ import akka.event.NoLogging
import akka.stream._ import akka.stream._
import akka.stream.stage._
/** /**
* INTERNAL API * INTERNAL API
@ -97,6 +98,7 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S
private val downstream = IteratorDownstream[O]() private val downstream = IteratorDownstream[O]()
private val interpreter = new OneBoundedInterpreter(upstream +: ops.asInstanceOf[Seq[Stage[_, _]]] :+ downstream, private val interpreter = new OneBoundedInterpreter(upstream +: ops.asInstanceOf[Seq[Stage[_, _]]] :+ downstream,
(op, ctx, evt) throw new UnsupportedOperationException("IteratorInterpreter is fully synchronous"), (op, ctx, evt) throw new UnsupportedOperationException("IteratorInterpreter is fully synchronous"),
NoLogging,
NoFlowMaterializer) NoFlowMaterializer)
interpreter.init() interpreter.init()

View file

@ -381,7 +381,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
private var callback: AsyncCallback[Notification] = _ private var callback: AsyncCallback[Notification] = _
private val elemsInFlight = FixedSizeBuffer[Try[Out]](parallelism) private val elemsInFlight = FixedSizeBuffer[Try[Out]](parallelism)
override def initAsyncInput(ctx: AsyncContext[Out, Notification]): Unit = { override def preStart(ctx: AsyncContext[Out, Notification]): Unit = {
callback = ctx.getAsyncCallback() callback = ctx.getAsyncCallback()
} }
@ -465,9 +465,8 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
private def todo = inFlight + buffer.used private def todo = inFlight + buffer.used
override def initAsyncInput(ctx: AsyncContext[Out, Try[Out]]): Unit = { override def preStart(ctx: AsyncContext[Out, Try[Out]]): Unit =
callback = ctx.getAsyncCallback() callback = ctx.getAsyncCallback()
}
override def decide(ex: Throwable) = decider(ex) override def decide(ex: Throwable) = decider(ex)
@ -527,17 +526,17 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt
private var logLevels: LogLevels = _ private var logLevels: LogLevels = _
private var log: LoggingAdapter = _ private var log: LoggingAdapter = _
// TODO implement as real preStart once https://github.com/akka/akka/pull/17295 is done // TODO more optimisations can be done here - prepare logOnPush function etc
def preStart(ctx: Context[T]): Unit = {
override def preStart(ctx: Context[T]): Unit = {
logLevels = ctx.attributes.logLevels.getOrElse(DefaultLogLevels) logLevels = ctx.attributes.logLevels.getOrElse(DefaultLogLevels)
log = logAdapter getOrElse { log = logAdapter match {
val sys = ctx.materializer.asInstanceOf[ActorFlowMaterializer].system case Some(l) l
Logging(sys, DefaultLoggerName) case _ Logging(ctx.materializer.asInstanceOf[ActorFlowMaterializer].system, DefaultLoggerName)
} }
} }
override def onPush(elem: T, ctx: Context[T]): SyncDirective = { override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
if (log == null) preStart(ctx)
if (isEnabled(logLevels.onElement)) if (isEnabled(logLevels.onElement))
log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem)) log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem))
@ -545,7 +544,6 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt
} }
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
if (log == null) preStart(ctx)
if (isEnabled(logLevels.onFailure)) if (isEnabled(logLevels.onFailure))
logLevels.onFailure match { logLevels.onFailure match {
case Logging.ErrorLevel log.error(cause, "[{}] Upstream failed.", name) case Logging.ErrorLevel log.error(cause, "[{}] Upstream failed.", name)
@ -556,7 +554,6 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt
} }
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
if (log == null) preStart(ctx)
if (isEnabled(logLevels.onFinish)) if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Upstream finished.", name) log.log(logLevels.onFinish, "[{}] Upstream finished.", name)
@ -564,7 +561,6 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt
} }
override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = { override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = {
if (log == null) preStart(ctx)
if (isEnabled(logLevels.onFinish)) if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Downstream finished.", name) log.log(logLevels.onFinish, "[{}] Downstream finished.", name)

View file

@ -90,6 +90,16 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C
context.execute() context.execute()
} }
/**
* User overridable callback.
* <p/>
* It is called before any other method defined on the `Stage`.
* Empty default implementation.
*/
@throws(classOf[Exception])
def preStart(ctx: Ctx): Unit = () // TODO or hide as LifecycleContext... then AsyncStage cannot do anything about it
// TODO hide here and make Async Stage final def preStart + def asyncPreStart ???
/** /**
* `onPush` is called when an element from upstream is available and there is demand from downstream, i.e. * `onPush` is called when an element from upstream is available and there is demand from downstream, i.e.
* in `onPush` you are allowed to call [[akka.stream.stage.Context#push]] to emit one element downstreams, * in `onPush` you are allowed to call [[akka.stream.stage.Context#push]] to emit one element downstreams,
@ -148,6 +158,15 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C
*/ */
def onUpstreamFailure(cause: Throwable, ctx: Ctx): TerminationDirective = ctx.fail(cause) def onUpstreamFailure(cause: Throwable, ctx: Ctx): TerminationDirective = ctx.fail(cause)
/**
* User overridable callback.
* <p/>
* Is called after the Stages final action is performed. // TODO need better wording here
* Empty default implementation.
*/
@throws(classOf[Exception])
def postStop(): Unit = ()
/** /**
* If an exception is thrown from [[#onPush]] this method is invoked to decide how * If an exception is thrown from [[#onPush]] this method is invoked to decide how
* to handle the exception. By default this method returns [[Supervision.Stop]]. * to handle the exception. By default this method returns [[Supervision.Stop]].
@ -275,13 +294,6 @@ abstract class AsyncStage[In, Out, Ext]
extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, AsyncContext[Out, Ext]] { extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, AsyncContext[Out, Ext]] {
private[stream] override def isDetached = true private[stream] override def isDetached = true
/**
* Initial input for the asynchronous side of this Stage. This can be overridden
* to set initial asynchronous requests in motion or schedule asynchronous
* events.
*/
def initAsyncInput(ctx: AsyncContext[Out, Ext]): Unit = ()
/** /**
* Implement this method to define the action to be taken in response to an * Implement this method to define the action to be taken in response to an
* asynchronous notification that was previously registered using * asynchronous notification that was previously registered using
@ -519,10 +531,21 @@ sealed trait TerminationDirective extends SyncDirective
// never instantiated // never instantiated
sealed abstract class FreeDirective private () extends UpstreamDirective with DownstreamDirective with TerminationDirective with AsyncDirective sealed abstract class FreeDirective private () extends UpstreamDirective with DownstreamDirective with TerminationDirective with AsyncDirective
trait LifecycleContext {
/**
* Returns the FlowMaterializer that was used to materialize this [[Stage]].
* It can be used to materialize sub-flows.
*/
def materializer: FlowMaterializer
/** Returns operation attributes associated with the this Stage */
def attributes: OperationAttributes
}
/** /**
* Passed to the callback methods of [[PushPullStage]] and [[StatefulStage]]. * Passed to the callback methods of [[PushPullStage]] and [[StatefulStage]].
*/ */
sealed trait Context[Out] { sealed trait Context[Out] extends LifecycleContext {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -565,14 +588,6 @@ sealed trait Context[Out] {
*/ */
def isFinishing: Boolean def isFinishing: Boolean
/**
* Returns the FlowMaterializer that was used to materialize this [[Stage]].
* It can be used to materialize sub-flows.
*/
def materializer: FlowMaterializer
/** Returns operation attributes associated with the this Stage */
def attributes: OperationAttributes
} }
/** /**