diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterLifecycleSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterLifecycleSpec.scala new file mode 100644 index 0000000000..65e27f8646 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterLifecycleSpec.scala @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl + +import java.util.concurrent.atomic.AtomicBoolean + +import akka.stream.Supervision._ +import akka.stream._ +import akka.stream.impl.fusing.{ InterpreterLifecycleSpecKit, ActorInterpreter } +import akka.stream.stage.Stage +import akka.stream.testkit.Utils.TE +import akka.stream.testkit.{ AkkaSpec, _ } +import scala.concurrent.duration._ + +class ActorInterpreterLifecycleSpec extends AkkaSpec with InterpreterLifecycleSpecKit { + + implicit val mat = ActorFlowMaterializer() + + class Setup(ops: List[Stage[_, _]] = List(fusing.Map({ x: Any ⇒ x }, stoppingDecider))) { + val up = TestPublisher.manualProbe[Int] + val down = TestSubscriber.manualProbe[Int] + private val props = ActorInterpreter.props(mat.settings, ops, mat).withDispatcher("akka.test.stream-dispatcher") + val actor = system.actorOf(props) + val processor = ActorProcessorFactory[Int, Int](actor) + } + + "An ActorInterpreter" must { + + "call preStart in order on stages" in new Setup(List( + PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-a"), + PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-b"), + PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-c"))) { + processor.subscribe(down) + val sub = down.expectSubscription() + sub.cancel() + up.subscribe(processor) + val upsub = up.expectSubscription() + upsub.expectCancellation() + + expectMsg("start-a") + expectMsg("start-b") + expectMsg("start-c") + } + + "call postStart in order on stages - when upstream completes" in new Setup(List( + PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-a"), + PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-b"), + PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-c"))) { + processor.subscribe(down) + val sub = down.expectSubscription() + up.subscribe(processor) + val upsub = up.expectSubscription() + upsub.sendComplete() + down.expectComplete() + + expectMsg("stop-a") + expectMsg("stop-b") + expectMsg("stop-c") + } + + "call postStart in order on stages - when downstream cancels" in new Setup(List( + PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-a"), + PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-b"), + PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-c"))) { + processor.subscribe(down) + val sub = down.expectSubscription() + sub.cancel() + up.subscribe(processor) + val upsub = up.expectSubscription() + upsub.expectCancellation() + + expectMsg("stop-c") + expectMsg("stop-b") + expectMsg("stop-a") + } + + "onError downstream when preStart fails" in new Setup(List( + PreStartFailer(() ⇒ throw TE("Boom!")))) { + processor.subscribe(down) + val sub = down.expectSubscription() + up.subscribe(processor) + val upsub = up.expectSubscription() + down.expectError(TE("Boom!")) + } + + "onError only once even with Supervision.restart" in new Setup(List( + PreStartFailer(() ⇒ throw TE("Boom!")))) { + processor.subscribe(down) + val sub = down.expectSubscription() + up.subscribe(processor) + val upsub = up.expectSubscription() + down.expectError(TE("Boom!")) + down.expectNoMsg(1.second) + } + + "onError downstream when preStart fails with 'most downstream' failure, when multiple stages fail" in new Setup(List( + PreStartFailer(() ⇒ throw TE("Boom 1!")), + PreStartFailer(() ⇒ throw TE("Boom 2!")), + PreStartFailer(() ⇒ throw TE("Boom 3!")))) { + processor.subscribe(down) + val sub = down.expectSubscription() + up.subscribe(processor) + val upsub = up.expectSubscription() + down.expectError(TE("Boom 3!")) + down.expectNoMsg(300.millis) + } + + "continue with stream shutdown when postStop fails" in new Setup(List( + PostStopFailer(() ⇒ throw TE("Boom!")))) { + processor.subscribe(down) + val sub = down.expectSubscription() + up.subscribe(processor) + val upsub = up.expectSubscription() + upsub.sendComplete() + down.expectComplete() // failures in postStop are logged, but not propagated // TODO Future features? make this a setting? + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index eae8ac1856..c6a3865b53 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -3,15 +3,10 @@ */ package akka.stream.impl.fusing -import akka.stream.stage.{ TerminationDirective, SyncDirective, Context, PushStage } - -import scala.util.control.NoStackTrace import akka.stream.Supervision class InterpreterSpec extends InterpreterSpecKit { import Supervision.stoppingDecider - import Supervision.resumingDecider - import Supervision.restartingDecider "Interpreter" must { @@ -462,13 +457,6 @@ class InterpreterSpec extends InterpreterSpecKit { } - // This test is related to issue #17351 - class PushFinishStage extends PushStage[Any, Any] { - override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = ctx.pushAndFinish(elem) - override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = - ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen")) - } - "work with pushAndFinish if upstream completes with pushAndFinish" in new TestSetup(Seq( new PushFinishStage)) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala index a089c77406..fbe2d19976 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala @@ -3,13 +3,64 @@ */ package akka.stream.impl.fusing -import akka.stream.OperationAttributes -import akka.stream.testkit.AkkaSpec +import akka.event.Logging import akka.stream.stage._ +import akka.stream.testkit.AkkaSpec +import akka.stream.{ ActorFlowMaterializer, OperationAttributes } import akka.testkit.TestProbe -import akka.stream.ActorFlowMaterializer -trait InterpreterSpecKit extends AkkaSpec { +trait InterpreterLifecycleSpecKit { + private[akka] case class PreStartAndPostStopIdentity[T]( + onStart: LifecycleContext ⇒ Unit = _ ⇒ (), + onStop: () ⇒ Unit = () ⇒ (), + onUpstreamCompleted: () ⇒ Unit = () ⇒ (), + onUpstreamFailed: Throwable ⇒ Unit = ex ⇒ ()) + extends PushStage[T, T] { + override def preStart(ctx: Context[T]) = onStart(ctx) + + override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) + + override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { + onUpstreamCompleted() + super.onUpstreamFinish(ctx) + } + + override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { + onUpstreamFailed(cause) + super.onUpstreamFailure(cause, ctx) + } + + override def postStop() = onStop() + } + + private[akka] case class PreStartFailer[T](pleaseThrow: () ⇒ Unit) extends PushStage[T, T] { + override def preStart(ctx: Context[T]) = pleaseThrow() + + override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) + } + + private[akka] case class PostStopFailer[T](ex: () ⇒ Throwable) extends PushStage[T, T] { + override def onUpstreamFinish(ctx: Context[T]) = ctx.finish() + override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) + + override def postStop(): Unit = throw ex() + } + + // This test is related to issue #17351 + private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends PushStage[Any, Any] { + override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = + ctx.pushAndFinish(elem) + + override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = + ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen")) + + override def postStop(): Unit = + onPostStop() + } + +} + +trait InterpreterSpecKit extends AkkaSpec with InterpreterLifecycleSpecKit { case object OnComplete case object Cancel @@ -61,6 +112,7 @@ trait InterpreterSpecKit extends AkkaSpec { val sidechannel = TestProbe() val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream, (op, ctx, event) ⇒ sidechannel.ref ! ActorInterpreter.AsyncInput(op, ctx, event), + Logging(system, classOf[TestSetup]), ActorFlowMaterializer(), OperationAttributes.none, forkLimit, overflowToHeap) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala new file mode 100644 index 0000000000..f85e45f73b --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import akka.stream.testkit.Utils.TE + +import scala.concurrent.duration._ + +class LifecycleInterpreterSpec extends InterpreterSpecKit { + import akka.stream.Supervision._ + + "Interpreter" must { + + "call preStart in order on stages" in new TestSetup(Seq( + PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-a"), + PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-b"), + PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-c"))) { + expectMsg("start-a") + expectMsg("start-b") + expectMsg("start-c") + expectNoMsg(300.millis) + upstream.onComplete() + } + + "call postStop in order on stages - when upstream completes" in new TestSetup(Seq( + PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-a", onStop = () ⇒ testActor ! "stop-a"), + PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-b", onStop = () ⇒ testActor ! "stop-b"), + PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-c", onStop = () ⇒ testActor ! "stop-c"))) { + upstream.onComplete() + expectMsg("complete-a") + expectMsg("stop-a") + expectMsg("complete-b") + expectMsg("stop-b") + expectMsg("complete-c") + expectMsg("stop-c") + expectNoMsg(300.millis) + } + + "call postStop in order on stages - when upstream onErrors" in new TestSetup(Seq( + PreStartAndPostStopIdentity( + onUpstreamFailed = ex ⇒ testActor ! ex.getMessage, + onStop = () ⇒ testActor ! "stop-c"))) { + val msg = "Boom! Boom! Boom!" + upstream.onError(TE(msg)) + expectMsg(msg) + expectMsg("stop-c") + expectNoMsg(300.millis) + } + + "call postStop in order on stages - when downstream cancels" in new TestSetup(Seq( + PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-a"), + PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-b"), + PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-c"))) { + downstream.cancel() + expectMsg("stop-c") + expectMsg("stop-b") + expectMsg("stop-a") + expectNoMsg(300.millis) + } + + "call preStart before postStop" in new TestSetup(Seq( + PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-a", onStop = () ⇒ testActor ! "stop-a"))) { + expectMsg("start-a") + expectNoMsg(300.millis) + upstream.onComplete() + expectMsg("stop-a") + expectNoMsg(300.millis) + } + + "onError when preStart fails" in new TestSetup(Seq( + PreStartFailer(() ⇒ throw TE("Boom!")))) { + lastEvents() should ===(Set(Cancel, OnError(TE("Boom!")))) + } + + "not blow up when postStop fails" in new TestSetup(Seq( + PostStopFailer(() ⇒ throw TE("Boom!")))) { + upstream.onComplete() + lastEvents() should ===(Set(OnComplete)) + } + + "onError when preStart fails with stages after" in new TestSetup(Seq( + Map((x: Int) ⇒ x, stoppingDecider), + PreStartFailer(() ⇒ throw TE("Boom!")), + Map((x: Int) ⇒ x, stoppingDecider))) { + lastEvents() should ===(Set(Cancel, OnError(TE("Boom!")))) + } + + "continue with stream shutdown when postStop fails" in new TestSetup(Seq( + PostStopFailer(() ⇒ throw TE("Boom!")))) { + lastEvents() should ===(Set()) + + upstream.onComplete() + lastEvents should ===(Set(OnComplete)) + } + + "postStop when pushAndFinish called if upstream completes with pushAndFinish" in new TestSetup(Seq( + new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNextAndComplete("foo") + lastEvents() should be(Set(OnNext("foo"), OnComplete)) + expectMsg("stop") + } + + "postStop when pushAndFinish called with pushAndFinish if indirect upstream completes with pushAndFinish" in new TestSetup(Seq( + Map((x: Any) ⇒ x, stoppingDecider), + new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"), + Map((x: Any) ⇒ x, stoppingDecider))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNextAndComplete("foo") + lastEvents() should be(Set(OnNext("foo"), OnComplete)) + expectMsg("stop") + } + + "postStop when pushAndFinish called with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new TestSetup(Seq( + new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"), + Fold("", (x: String, y: String) ⇒ x + y, stoppingDecider))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNextAndComplete("foo") + lastEvents() should be(Set(OnNext("foo"), OnComplete)) + expectMsg("stop") + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index fa5c52f6e3..dd6aa6d704 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -59,7 +59,7 @@ object TlsSpec { class Timeout(duration: FiniteDuration)(implicit system: ActorSystem) extends AsyncStage[ByteString, ByteString, Unit] { private var last: ByteString = _ - override def initAsyncInput(ctx: AsyncContext[ByteString, Unit]) = { + override def preStart(ctx: AsyncContext[ByteString, Unit]) = { val cb = ctx.getAsyncCallback() system.scheduler.scheduleOnce(duration)(cb.invoke(()))(system.dispatcher) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index e017d5f1f9..39f33882ab 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -4,18 +4,16 @@ package akka.stream.impl.fusing import java.util.Arrays -import akka.actor.{ Actor, ActorRef } +import akka.actor._ import akka.stream.ActorFlowMaterializerSettings import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } import akka.stream.impl._ import akka.stream.OperationAttributes +import akka.stream.impl.fusing.OneBoundedInterpreter.{ InitializationFailed, InitializationFailure, InitializationSuccessful } import akka.stream.stage._ import org.reactivestreams.{ Subscriber, Subscription } -import akka.actor.Props -import akka.actor.ActorLogging import akka.event.{ Logging, LoggingAdapter } -import akka.actor.DeadLetterSuppression import akka.stream.ActorFlowMaterializer /** @@ -181,6 +179,8 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, private var downstreamCompleted = false // this is true while we “hold the ball”; while “false” incoming demand will just be queued up private var upstreamWaiting = true + // when upstream failed before we got the exposed publisher + private var upstreamFailed: Option[Throwable] = None // the number of elements emitted during a single execution is bounded private var burstRemaining = outputBurstLimit @@ -227,6 +227,9 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, log.debug("fail due to: {}", e.getMessage) if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e) + } else if (exposedPublisher == null && upstreamFailed.isEmpty) { + // fail called before the exposed publisher arrived, we must store it and fail when we're first able to + upstreamFailed = Some(e) } } @@ -261,8 +264,13 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, protected def waitingExposedPublisher: Actor.Receive = { case ExposedPublisher(publisher) ⇒ - exposedPublisher = publisher - subreceive.become(downstreamRunning) + upstreamFailed match { + case _: Some[_] ⇒ + publisher.shutdown(upstreamFailed) + case _ ⇒ + exposedPublisher = publisher + subreceive.become(downstreamRunning) + } case other ⇒ throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]") } @@ -319,11 +327,18 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings private val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream, (op, ctx, event) ⇒ self ! AsyncInput(op, ctx, event), + Logging(this), materializer, attributes, name = context.self.path.toString) - interpreter.init() + interpreter.init() match { + case failed: InitializationFailed ⇒ + // the Actor will be stopped thanks to aroundReceive checking interpreter.isFinished + upstream.setDownstreamCanceled() + downstream.fail(failed.mostDownstream.ex) + case InitializationSuccessful ⇒ // ok + } def receive: Receive = upstream.subreceive diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index 330b5bc29c..2879cceb66 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -3,13 +3,13 @@ */ package akka.stream.impl.fusing -import akka.stream.{ FlowMaterializer, Supervision } +import akka.event.LoggingAdapter import akka.stream.impl.ReactiveStreamsCompliance -import akka.stream.OperationAttributes import akka.stream.stage._ +import akka.stream.{ FlowMaterializer, OperationAttributes, Supervision } import scala.annotation.{ switch, tailrec } -import scala.collection.breakOut +import scala.collection.{ breakOut, immutable } import scala.util.control.NonFatal /** @@ -18,6 +18,20 @@ import scala.util.control.NonFatal private[akka] object OneBoundedInterpreter { final val Debug = false + /** INTERNAL API */ + private[akka] sealed trait InitializationStatus + /** INTERNAL API */ + private[akka] final case object InitializationSuccessful extends InitializationStatus + /** INTERNAL API */ + private[akka] final case class InitializationFailed(failures: immutable.Seq[InitializationFailure]) extends InitializationStatus { + // exceptions are reverse ordered here, below methods help to avoid confusion when used from the outside + def mostUpstream = failures.last + def mostDownstream = failures.head + } + + /** INTERNAL API */ + private[akka] case class InitializationFailure(op: Int, ex: Throwable) + /** * INTERNAL API * @@ -138,6 +152,7 @@ private[akka] object OneBoundedInterpreter { */ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], onAsyncInput: (AsyncStage[Any, Any, Any], AsyncContext[Any, Any], Any) ⇒ Unit, + log: LoggingAdapter, materializer: FlowMaterializer, attributes: OperationAttributes = OperationAttributes.none, val forkLimit: Int = 100, @@ -312,7 +327,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], def isFinishing: Boolean = hasBits(TerminationPending) - final protected def pushAndFinishCommon(elem: Any): Unit = { + final protected def pushAndFinishCommon(elem: Any, finishState: UntypedOp): Unit = { + finishCurrentOp(finishState) ReactiveStreamsCompliance.requireNonNullElement(elem) if (currentOp.isDetached) { mustHave(DownstreamBall) @@ -321,9 +337,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], } override def pushAndFinish(elem: Any): DownstreamDirective = { - pushAndFinishCommon(elem) - // Spit the execution domain in two, and invoke op postStop callbacks if there are any - finishCurrentOp() + // Spit the execution domain in two and invoke postStop + pushAndFinishCommon(elem, Finished.asInstanceOf[UntypedOp]) // This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution // path. Other forks are not order dependent because they execute on isolated execution domains which cannot @@ -422,11 +437,12 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this) override def pushAndFinish(elem: Any): DownstreamDirective = { - pushAndFinishCommon(elem) + // PushFinished // Put an isolation barrier that will prevent the onPull of this op to be called again. This barrier // is different from simple Finished that it allows onUpstreamTerminated to pass through, unless onPull // has been called on the stage - pipeline(activeOpIndex) = PushFinished.asInstanceOf[UntypedOp] + pushAndFinishCommon(elem, PushFinished.asInstanceOf[UntypedOp]) + elementInFlight = elem state = PushFinish null @@ -623,9 +639,20 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], activeOpIndex = savePos } - def init(): Unit = { - initBoundaries() + /** + * Initializes all stages setting their initial context and calling [[AbstractStage.preStart]] on each. + */ + def init(): InitializationStatus = { + val failures = initBoundaries() runDetached() + + if (failures.isEmpty) InitializationSuccessful + else { + val failure = failures.head + activeOpIndex = failure.op + currentOp.enterAndFail(failure.ex) + InitializationFailed(failures) + } } def isFinished: Boolean = pipeline(Upstream) == Finished && pipeline(Downstream) == Finished @@ -652,7 +679,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], /** * This method injects a Context to each of the BoundaryStages and AsyncStages. This will be the context returned by enter(). */ - private def initBoundaries(): Unit = { + private def initBoundaries(): List[InitializationFailure] = { + var failures: List[InitializationFailure] = Nil var op = 0 while (op < pipeline.length) { (pipeline(op): Any) match { @@ -662,16 +690,26 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], case a: AsyncStage[Any, Any, Any] @unchecked ⇒ a.context = new EntryState("async", op) activeOpIndex = op - a.initAsyncInput(a.context) // TODO remove asyncInput? it's like preStart + a.preStart(a.context) - case _ ⇒ + case a: AbstractStage[Any, Any, Any, Any, Any] @unchecked ⇒ + val state = new EntryState("stage", op) + a.context = state + try a.preStart(state) catch { + case NonFatal(ex) ⇒ + failures ::= InitializationFailure(op, ex) // not logging here as 'most downstream' exception will be signalled via onError + // TODO could use decider here, but semantics become a bit iffy (Resume => ignore error in prestart? Doesn't sound like a good idea). + } } op += 1 } + failures } - private def finishCurrentOp(): Unit = { - pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] + private def finishCurrentOp(finishState: UntypedOp = Finished.asInstanceOf[UntypedOp]): Unit = { + try pipeline(activeOpIndex).postStop() + catch { case NonFatal(ex) ⇒ log.error(s"Stage [{}] postStop failed", ex) } + finally pipeline(activeOpIndex) = finishState } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index 181cdc1b54..590e492b5d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -3,8 +3,9 @@ */ package akka.stream.impl.fusing -import akka.stream.stage._ +import akka.event.NoLogging import akka.stream._ +import akka.stream.stage._ /** * INTERNAL API @@ -97,6 +98,7 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S private val downstream = IteratorDownstream[O]() private val interpreter = new OneBoundedInterpreter(upstream +: ops.asInstanceOf[Seq[Stage[_, _]]] :+ downstream, (op, ctx, evt) ⇒ throw new UnsupportedOperationException("IteratorInterpreter is fully synchronous"), + NoLogging, NoFlowMaterializer) interpreter.init() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index bc38bb0eb0..504e5e3569 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -381,7 +381,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut private var callback: AsyncCallback[Notification] = _ private val elemsInFlight = FixedSizeBuffer[Try[Out]](parallelism) - override def initAsyncInput(ctx: AsyncContext[Out, Notification]): Unit = { + override def preStart(ctx: AsyncContext[Out, Notification]): Unit = { callback = ctx.getAsyncCallback() } @@ -465,9 +465,8 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I private def todo = inFlight + buffer.used - override def initAsyncInput(ctx: AsyncContext[Out, Try[Out]]): Unit = { + override def preStart(ctx: AsyncContext[Out, Try[Out]]): Unit = callback = ctx.getAsyncCallback() - } override def decide(ex: Throwable) = decider(ex) @@ -527,17 +526,17 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt private var logLevels: LogLevels = _ private var log: LoggingAdapter = _ - // TODO implement as real preStart once https://github.com/akka/akka/pull/17295 is done - def preStart(ctx: Context[T]): Unit = { + // TODO more optimisations can be done here - prepare logOnPush function etc + + override def preStart(ctx: Context[T]): Unit = { logLevels = ctx.attributes.logLevels.getOrElse(DefaultLogLevels) - log = logAdapter getOrElse { - val sys = ctx.materializer.asInstanceOf[ActorFlowMaterializer].system - Logging(sys, DefaultLoggerName) + log = logAdapter match { + case Some(l) ⇒ l + case _ ⇒ Logging(ctx.materializer.asInstanceOf[ActorFlowMaterializer].system, DefaultLoggerName) } } override def onPush(elem: T, ctx: Context[T]): SyncDirective = { - if (log == null) preStart(ctx) if (isEnabled(logLevels.onElement)) log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem)) @@ -545,7 +544,6 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt } override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { - if (log == null) preStart(ctx) if (isEnabled(logLevels.onFailure)) logLevels.onFailure match { case Logging.ErrorLevel ⇒ log.error(cause, "[{}] Upstream failed.", name) @@ -556,7 +554,6 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt } override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { - if (log == null) preStart(ctx) if (isEnabled(logLevels.onFinish)) log.log(logLevels.onFinish, "[{}] Upstream finished.", name) @@ -564,7 +561,6 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt } override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = { - if (log == null) preStart(ctx) if (isEnabled(logLevels.onFinish)) log.log(logLevels.onFinish, "[{}] Downstream finished.", name) diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index ec027ffd14..f4af139595 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -90,6 +90,16 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C context.execute() } + /** + * User overridable callback. + *

+ * It is called before any other method defined on the `Stage`. + * Empty default implementation. + */ + @throws(classOf[Exception]) + def preStart(ctx: Ctx): Unit = () // TODO or hide as LifecycleContext... then AsyncStage cannot do anything about it + // TODO hide here and make Async Stage final def preStart + def asyncPreStart ??? + /** * `onPush` is called when an element from upstream is available and there is demand from downstream, i.e. * in `onPush` you are allowed to call [[akka.stream.stage.Context#push]] to emit one element downstreams, @@ -148,6 +158,15 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C */ def onUpstreamFailure(cause: Throwable, ctx: Ctx): TerminationDirective = ctx.fail(cause) + /** + * User overridable callback. + *

+ * Is called after the Stages final action is performed. // TODO need better wording here + * Empty default implementation. + */ + @throws(classOf[Exception]) + def postStop(): Unit = () + /** * If an exception is thrown from [[#onPush]] this method is invoked to decide how * to handle the exception. By default this method returns [[Supervision.Stop]]. @@ -275,13 +294,6 @@ abstract class AsyncStage[In, Out, Ext] extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, AsyncContext[Out, Ext]] { private[stream] override def isDetached = true - /** - * Initial input for the asynchronous “side” of this Stage. This can be overridden - * to set initial asynchronous requests in motion or schedule asynchronous - * events. - */ - def initAsyncInput(ctx: AsyncContext[Out, Ext]): Unit = () - /** * Implement this method to define the action to be taken in response to an * asynchronous notification that was previously registered using @@ -519,10 +531,21 @@ sealed trait TerminationDirective extends SyncDirective // never instantiated sealed abstract class FreeDirective private () extends UpstreamDirective with DownstreamDirective with TerminationDirective with AsyncDirective +trait LifecycleContext { + /** + * Returns the FlowMaterializer that was used to materialize this [[Stage]]. + * It can be used to materialize sub-flows. + */ + def materializer: FlowMaterializer + + /** Returns operation attributes associated with the this Stage */ + def attributes: OperationAttributes +} + /** * Passed to the callback methods of [[PushPullStage]] and [[StatefulStage]]. */ -sealed trait Context[Out] { +sealed trait Context[Out] extends LifecycleContext { /** * INTERNAL API */ @@ -565,14 +588,6 @@ sealed trait Context[Out] { */ def isFinishing: Boolean - /** - * Returns the FlowMaterializer that was used to materialize this [[Stage]]. - * It can be used to materialize sub-flows. - */ - def materializer: FlowMaterializer - - /** Returns operation attributes associated with the this Stage */ - def attributes: OperationAttributes } /**