Migrate PushStages in LifecycleInterpreterSpec to GraphStages #19834.
Includes: - PreStartAndPostStopIdentity - PreStartFailer - PostStopFailer - PushFinishStage
This commit is contained in:
parent
2418e610ab
commit
f9a771497f
1 changed files with 92 additions and 58 deletions
|
|
@ -3,6 +3,8 @@
|
|||
*/
|
||||
package akka.stream.impl.fusing
|
||||
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
|
||||
import akka.stream.stage._
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.stream.testkit.Utils.TE
|
||||
|
|
@ -14,10 +16,10 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
|
||||
"Interpreter" must {
|
||||
|
||||
"call preStart in order on stages" in new OneBoundedSetup[String](Seq(
|
||||
PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-a"),
|
||||
PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-b"),
|
||||
PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-c"))) {
|
||||
"call preStart in order on stages" in new OneBoundedSetup[String](
|
||||
PreStartAndPostStopIdentity(onStart = () ⇒ testActor ! "start-a"),
|
||||
PreStartAndPostStopIdentity(onStart = () ⇒ testActor ! "start-b"),
|
||||
PreStartAndPostStopIdentity(onStart = () ⇒ testActor ! "start-c")) {
|
||||
expectMsg("start-a")
|
||||
expectMsg("start-b")
|
||||
expectMsg("start-c")
|
||||
|
|
@ -25,10 +27,10 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
upstream.onComplete()
|
||||
}
|
||||
|
||||
"call postStop in order on stages - when upstream completes" in new OneBoundedSetup[String](Seq(
|
||||
"call postStop in order on stages - when upstream completes" in new OneBoundedSetup[String](
|
||||
PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-a", onStop = () ⇒ testActor ! "stop-a"),
|
||||
PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-b", onStop = () ⇒ testActor ! "stop-b"),
|
||||
PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-c", onStop = () ⇒ testActor ! "stop-c"))) {
|
||||
PreStartAndPostStopIdentity(onUpstreamCompleted = () ⇒ testActor ! "complete-c", onStop = () ⇒ testActor ! "stop-c")) {
|
||||
upstream.onComplete()
|
||||
expectMsg("complete-a")
|
||||
expectMsg("stop-a")
|
||||
|
|
@ -39,10 +41,10 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
expectNoMsg(300.millis)
|
||||
}
|
||||
|
||||
"call postStop in order on stages - when upstream onErrors" in new OneBoundedSetup[String](Seq(
|
||||
"call postStop in order on stages - when upstream onErrors" in new OneBoundedSetup[String](
|
||||
PreStartAndPostStopIdentity(
|
||||
onUpstreamFailed = ex ⇒ testActor ! ex.getMessage,
|
||||
onStop = () ⇒ testActor ! "stop-c"))) {
|
||||
onStop = () ⇒ testActor ! "stop-c")) {
|
||||
val msg = "Boom! Boom! Boom!"
|
||||
upstream.onError(TE(msg))
|
||||
expectMsg(msg)
|
||||
|
|
@ -50,10 +52,10 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
expectNoMsg(300.millis)
|
||||
}
|
||||
|
||||
"call postStop in order on stages - when downstream cancels" in new OneBoundedSetup[String](Seq(
|
||||
"call postStop in order on stages - when downstream cancels" in new OneBoundedSetup[String](
|
||||
PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-a"),
|
||||
PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-b"),
|
||||
PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-c"))) {
|
||||
PreStartAndPostStopIdentity(onStop = () ⇒ testActor ! "stop-c")) {
|
||||
downstream.cancel()
|
||||
expectMsg("stop-c")
|
||||
expectMsg("stop-b")
|
||||
|
|
@ -61,8 +63,8 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
expectNoMsg(300.millis)
|
||||
}
|
||||
|
||||
"call preStart before postStop" in new OneBoundedSetup[String](Seq(
|
||||
PreStartAndPostStopIdentity(onStart = _ ⇒ testActor ! "start-a", onStop = () ⇒ testActor ! "stop-a"))) {
|
||||
"call preStart before postStop" in new OneBoundedSetup[String](
|
||||
PreStartAndPostStopIdentity(onStart = () ⇒ testActor ! "start-a", onStop = () ⇒ testActor ! "stop-a")) {
|
||||
expectMsg("start-a")
|
||||
expectNoMsg(300.millis)
|
||||
upstream.onComplete()
|
||||
|
|
@ -70,34 +72,34 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
expectNoMsg(300.millis)
|
||||
}
|
||||
|
||||
"onError when preStart fails" in new OneBoundedSetup[String](Seq(
|
||||
PreStartFailer(() ⇒ throw TE("Boom!")))) {
|
||||
"onError when preStart fails" in new OneBoundedSetup[String](
|
||||
PreStartFailer(() ⇒ throw TE("Boom!"))) {
|
||||
lastEvents() should ===(Set(Cancel, OnError(TE("Boom!"))))
|
||||
}
|
||||
|
||||
"not blow up when postStop fails" in new OneBoundedSetup[String](Seq(
|
||||
PostStopFailer(() ⇒ throw TE("Boom!")))) {
|
||||
"not blow up when postStop fails" in new OneBoundedSetup[String](
|
||||
PostStopFailer(() ⇒ throw TE("Boom!"))) {
|
||||
upstream.onComplete()
|
||||
lastEvents() should ===(Set(OnComplete))
|
||||
}
|
||||
|
||||
"onError when preStart fails with stages after" in new OneBoundedSetup[String](Seq(
|
||||
Map((x: Int) ⇒ x, stoppingDecider),
|
||||
"onError when preStart fails with stages after" in new OneBoundedSetup[String](
|
||||
Map((x: Int) ⇒ x, stoppingDecider).toGS,
|
||||
PreStartFailer(() ⇒ throw TE("Boom!")),
|
||||
Map((x: Int) ⇒ x, stoppingDecider))) {
|
||||
Map((x: Int) ⇒ x, stoppingDecider).toGS) {
|
||||
lastEvents() should ===(Set(Cancel, OnError(TE("Boom!"))))
|
||||
}
|
||||
|
||||
"continue with stream shutdown when postStop fails" in new OneBoundedSetup[String](Seq(
|
||||
PostStopFailer(() ⇒ throw TE("Boom!")))) {
|
||||
"continue with stream shutdown when postStop fails" in new OneBoundedSetup[String](
|
||||
PostStopFailer(() ⇒ throw TE("Boom!"))) {
|
||||
lastEvents() should ===(Set())
|
||||
|
||||
upstream.onComplete()
|
||||
lastEvents should ===(Set(OnComplete))
|
||||
}
|
||||
|
||||
"postStop when pushAndFinish called if upstream completes with pushAndFinish" in new OneBoundedSetup[String](Seq(
|
||||
new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"))) {
|
||||
"postStop when pushAndFinish called if upstream completes with pushAndFinish" in new OneBoundedSetup[String](
|
||||
new PushFinishStage(onPostStop = () ⇒ testActor ! "stop")) {
|
||||
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
|
|
@ -109,10 +111,10 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
expectMsg("stop")
|
||||
}
|
||||
|
||||
"postStop when pushAndFinish called with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[String](Seq(
|
||||
Map((x: Any) ⇒ x, stoppingDecider),
|
||||
"postStop when pushAndFinish called with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[String](
|
||||
Map((x: Any) ⇒ x, stoppingDecider).toGS,
|
||||
new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"),
|
||||
Map((x: Any) ⇒ x, stoppingDecider))) {
|
||||
Map((x: Any) ⇒ x, stoppingDecider).toGS) {
|
||||
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
|
|
@ -124,9 +126,9 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
expectMsg("stop")
|
||||
}
|
||||
|
||||
"postStop when pushAndFinish called with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new OneBoundedSetup[String](Seq(
|
||||
"postStop when pushAndFinish called with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new OneBoundedSetup[String](
|
||||
new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"),
|
||||
Fold("", (x: String, y: String) ⇒ x + y, stoppingDecider))) {
|
||||
Fold("", (x: String, y: String) ⇒ x + y, stoppingDecider).toGS) {
|
||||
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
|
|
@ -141,53 +143,85 @@ class LifecycleInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
}
|
||||
|
||||
private[akka] case class PreStartAndPostStopIdentity[T](
|
||||
onStart: LifecycleContext ⇒ Unit = _ ⇒ (),
|
||||
onStart: () ⇒ Unit = () ⇒ (),
|
||||
onStop: () ⇒ Unit = () ⇒ (),
|
||||
onUpstreamCompleted: () ⇒ Unit = () ⇒ (),
|
||||
onUpstreamFailed: Throwable ⇒ Unit = ex ⇒ ())
|
||||
extends PushStage[T, T] {
|
||||
override def preStart(ctx: LifecycleContext) = onStart(ctx)
|
||||
onUpstreamFailed: Throwable ⇒ Unit = ex ⇒ ()) extends SimpleLinearGraphStage[T] {
|
||||
|
||||
override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem)
|
||||
override def createLogic(attributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
override def preStart(): Unit = onStart()
|
||||
override def postStop(): Unit = onStop()
|
||||
|
||||
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
|
||||
onUpstreamCompleted()
|
||||
super.onUpstreamFinish(ctx)
|
||||
}
|
||||
override def onPush(): Unit = push(out, grab(in))
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
|
||||
onUpstreamFailed(cause)
|
||||
super.onUpstreamFailure(cause, ctx)
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
onUpstreamCompleted()
|
||||
super.onUpstreamFinish()
|
||||
}
|
||||
|
||||
override def postStop() = onStop()
|
||||
override def onUpstreamFailure(cause: Throwable): Unit = {
|
||||
onUpstreamFailed(cause)
|
||||
super.onUpstreamFailure(cause)
|
||||
}
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
|
||||
override def toString = "PreStartAndPostStopIdentity"
|
||||
}
|
||||
|
||||
private[akka] case class PreStartFailer[T](pleaseThrow: () ⇒ Unit) extends PushStage[T, T] {
|
||||
private[akka] case class PreStartFailer[T](pleaseThrow: () ⇒ Unit) extends SimpleLinearGraphStage[T] {
|
||||
|
||||
override def preStart(ctx: LifecycleContext) =
|
||||
pleaseThrow()
|
||||
override def createLogic(attributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
override def preStart(): Unit = pleaseThrow()
|
||||
|
||||
override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem)
|
||||
override def onPush(): Unit = push(out, grab(in))
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
|
||||
override def toString = "PreStartFailer"
|
||||
}
|
||||
|
||||
private[akka] case class PostStopFailer[T](ex: () ⇒ Throwable) extends PushStage[T, T] {
|
||||
override def onUpstreamFinish(ctx: Context[T]) = ctx.finish()
|
||||
override def onPush(elem: T, ctx: Context[T]) = ctx.push(elem)
|
||||
private[akka] case class PostStopFailer[T](pleaseThrow: () ⇒ Unit) extends SimpleLinearGraphStage[T] {
|
||||
|
||||
override def postStop(): Unit = throw ex()
|
||||
override def createLogic(attributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
override def onUpstreamFinish(): Unit = completeStage()
|
||||
|
||||
override def onPush(): Unit = push(out, grab(in))
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
override def postStop(): Unit = pleaseThrow()
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
|
||||
override def toString = "PostStopFailer"
|
||||
}
|
||||
|
||||
// This test is related to issue #17351
|
||||
private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends PushStage[Any, Any] {
|
||||
override def onPush(elem: Any, ctx: Context[Any]): SyncDirective =
|
||||
ctx.pushAndFinish(elem)
|
||||
private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends SimpleLinearGraphStage[Any] {
|
||||
|
||||
override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective =
|
||||
ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen"))
|
||||
override def createLogic(attributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
override def onPush(): Unit = {
|
||||
push(out, grab(in))
|
||||
completeStage()
|
||||
}
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
override def postStop(): Unit =
|
||||
onPostStop()
|
||||
override def onUpstreamFinish(): Unit = failStage(TE("Cannot happen"))
|
||||
|
||||
override def postStop(): Unit = onPostStop()
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
|
||||
override def toString = "PushFinish"
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue