diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala index 84129085e9..8960456239 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala @@ -3,6 +3,8 @@ */ package akka.stream.impl.fusing +import akka.stream.Attributes +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.stage._ import akka.testkit.AkkaSpec import akka.stream.testkit.Utils.TE @@ -14,10 +16,10 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { "Interpreter" must { - "call preStart in order on stages" in new OneBoundedSetup[String](Seq( - PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-a"), - PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-b"), - PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-c"))) { + "call preStart in order on stages" in new OneBoundedSetup[String]( + PreStartAndPostStopIdentity(onStart = () ⇒ testActor ! "start-a"), + PreStartAndPostStopIdentity(onStart = () ⇒ testActor ! "start-b"), + PreStartAndPostStopIdentity(onStart = () ⇒ testActor ! "start-c")) { expectMsg("start-a") expectMsg("start-b") expectMsg("start-c") @@ -25,10 +27,10 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { upstream.onComplete() } - "call postStop in order on stages - when upstream completes" in new OneBoundedSetup[String](Seq( + "call postStop in order on stages - when upstream completes" in new OneBoundedSetup[String]( PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-a", onStop = () ⇒ testActor ! "stop-a"), PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-b", onStop = () ⇒ testActor ! "stop-b"), - PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-c", onStop = () ⇒ testActor ! "stop-c"))) { + PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-c", onStop = () ⇒ testActor ! "stop-c")) { upstream.onComplete() expectMsg("complete-a") expectMsg("stop-a") @@ -39,10 +41,10 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { expectNoMsg(300.millis) } - "call postStop in order on stages - when upstream onErrors" in new OneBoundedSetup[String](Seq( + "call postStop in order on stages - when upstream onErrors" in new OneBoundedSetup[String]( PreStartAndPostStopIdentity( onUpstreamFailed = ex ⇒ testActor ! ex.getMessage, - onStop = () ⇒ testActor ! "stop-c"))) { + onStop = () ⇒ testActor ! "stop-c")) { val msg = "Boom! Boom! Boom!" upstream.onError(TE(msg)) expectMsg(msg) @@ -50,10 +52,10 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { expectNoMsg(300.millis) } - "call postStop in order on stages - when downstream cancels" in new OneBoundedSetup[String](Seq( + "call postStop in order on stages - when downstream cancels" in new OneBoundedSetup[String]( PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-a"), PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-b"), - PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-c"))) { + PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-c")) { downstream.cancel() expectMsg("stop-c") expectMsg("stop-b") @@ -61,8 +63,8 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { expectNoMsg(300.millis) } - "call preStart before postStop" in new OneBoundedSetup[String](Seq( - PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-a", onStop = () ⇒ testActor ! "stop-a"))) { + "call preStart before postStop" in new OneBoundedSetup[String]( + PreStartAndPostStopIdentity(onStart = () ⇒ testActor ! "start-a", onStop = () ⇒ testActor ! "stop-a")) { expectMsg("start-a") expectNoMsg(300.millis) upstream.onComplete() @@ -70,34 +72,34 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { expectNoMsg(300.millis) } - "onError when preStart fails" in new OneBoundedSetup[String](Seq( - PreStartFailer(() ⇒ throw TE("Boom!")))) { + "onError when preStart fails" in new OneBoundedSetup[String]( + PreStartFailer(() ⇒ throw TE("Boom!"))) { lastEvents() should ===(Set(Cancel, OnError(TE("Boom!")))) } - "not blow up when postStop fails" in new OneBoundedSetup[String](Seq( - PostStopFailer(() ⇒ throw TE("Boom!")))) { + "not blow up when postStop fails" in new OneBoundedSetup[String]( + PostStopFailer(() ⇒ throw TE("Boom!"))) { upstream.onComplete() lastEvents() should ===(Set(OnComplete)) } - "onError when preStart fails with stages after" in new OneBoundedSetup[String](Seq( - Map((x: Int) ⇒ x, stoppingDecider), + "onError when preStart fails with stages after" in new OneBoundedSetup[String]( + Map((x: Int) ⇒ x, stoppingDecider).toGS, PreStartFailer(() ⇒ throw TE("Boom!")), - Map((x: Int) ⇒ x, stoppingDecider))) { + Map((x: Int) ⇒ x, stoppingDecider).toGS) { lastEvents() should ===(Set(Cancel, OnError(TE("Boom!")))) } - "continue with stream shutdown when postStop fails" in new OneBoundedSetup[String](Seq( - PostStopFailer(() ⇒ throw TE("Boom!")))) { + "continue with stream shutdown when postStop fails" in new OneBoundedSetup[String]( + PostStopFailer(() ⇒ throw TE("Boom!"))) { lastEvents() should ===(Set()) upstream.onComplete() lastEvents should ===(Set(OnComplete)) } - "postStop when pushAndFinish called if upstream completes with pushAndFinish" in new OneBoundedSetup[String](Seq( - new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"))) { + "postStop when pushAndFinish called if upstream completes with pushAndFinish" in new OneBoundedSetup[String]( + new PushFinishStage(onPostStop = () ⇒ testActor ! "stop")) { lastEvents() should be(Set.empty) @@ -109,10 +111,10 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { expectMsg("stop") } - "postStop when pushAndFinish called with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[String](Seq( - Map((x: Any) ⇒ x, stoppingDecider), + "postStop when pushAndFinish called with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[String]( + Map((x: Any) ⇒ x, stoppingDecider).toGS, new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"), - Map((x: Any) ⇒ x, stoppingDecider))) { + Map((x: Any) ⇒ x, stoppingDecider).toGS) { lastEvents() should be(Set.empty) @@ -124,9 +126,9 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { expectMsg("stop") } - "postStop when pushAndFinish called with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new OneBoundedSetup[String](Seq( + "postStop when pushAndFinish called with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new OneBoundedSetup[String]( new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"), - Fold("", (x: String, y: String) ⇒ x + y, stoppingDecider))) { + Fold("", (x: String, y: String) ⇒ x + y, stoppingDecider).toGS) { lastEvents() should be(Set.empty) @@ -141,53 +143,85 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { } private[akka] case class PreStartAndPostStopIdentity[T]( - onStart: LifecycleContext ⇒ Unit = _ ⇒ (), + onStart: () ⇒ Unit = () ⇒ (), onStop: () ⇒ Unit = () ⇒ (), onUpstreamCompleted: () ⇒ Unit = () ⇒ (), - onUpstreamFailed: Throwable ⇒ Unit = ex ⇒ ()) - extends PushStage[T, T] { - override def preStart(ctx: LifecycleContext) = onStart(ctx) + onUpstreamFailed: Throwable ⇒ Unit = ex ⇒ ()) extends SimpleLinearGraphStage[T] { - override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) + override def createLogic(attributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def preStart(): Unit = onStart() + override def postStop(): Unit = onStop() - override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { - onUpstreamCompleted() - super.onUpstreamFinish(ctx) - } + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) - override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { - onUpstreamFailed(cause) - super.onUpstreamFailure(cause, ctx) - } + override def onUpstreamFinish(): Unit = { + onUpstreamCompleted() + super.onUpstreamFinish() + } - override def postStop() = onStop() + override def onUpstreamFailure(cause: Throwable): Unit = { + onUpstreamFailed(cause) + super.onUpstreamFailure(cause) + } + + setHandlers(in, out, this) + } + + override def toString = "PreStartAndPostStopIdentity" } - private[akka] case class PreStartFailer[T](pleaseThrow: () ⇒ Unit) extends PushStage[T, T] { + private[akka] case class PreStartFailer[T](pleaseThrow: () ⇒ Unit) extends SimpleLinearGraphStage[T] { - override def preStart(ctx: LifecycleContext) = - pleaseThrow() + override def createLogic(attributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def preStart(): Unit = pleaseThrow() - override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } + + override def toString = "PreStartFailer" } - private[akka] case class PostStopFailer[T](ex: () ⇒ Throwable) extends PushStage[T, T] { - override def onUpstreamFinish(ctx: Context[T]) = ctx.finish() - override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) + private[akka] case class PostStopFailer[T](pleaseThrow: () ⇒ Unit) extends SimpleLinearGraphStage[T] { - override def postStop(): Unit = throw ex() + override def createLogic(attributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onUpstreamFinish(): Unit = completeStage() + + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) + + override def postStop(): Unit = pleaseThrow() + + setHandlers(in, out, this) + } + + override def toString = "PostStopFailer" } // This test is related to issue #17351 - private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends PushStage[Any, Any] { - override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = - ctx.pushAndFinish(elem) + private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends SimpleLinearGraphStage[Any] { - override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = - ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen")) + override def createLogic(attributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = { + push(out, grab(in)) + completeStage() + } + override def onPull(): Unit = pull(in) - override def postStop(): Unit = - onPostStop() + override def onUpstreamFinish(): Unit = failStage(TE("Cannot happen")) + + override def postStop(): Unit = onPostStop() + + setHandlers(in, out, this) + } + + override def toString = "PushFinish" } - }