diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index c0bd29dd10..8faabe0aac 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -4,6 +4,8 @@ package akka.stream.impl.fusing import akka.event.Logging +import akka.stream.ActorAttributes.SupervisionStrategy +import akka.stream.Supervision.Decider import akka.stream._ import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, Failed, GraphAssembly, UpstreamBoundaryStageLogic } import akka.stream.stage.AbstractStage.PushPullGraphStage @@ -307,7 +309,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { } } - abstract class OneBoundedSetup[T](_ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder { + abstract class OneBoundedSetupWithDecider[T](decider: Decider, _ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder { val ops = _ops.toArray val upstream = new UpstreamOneBoundedProbe[T] @@ -329,7 +331,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { import GraphInterpreter.Boundary var i = 0 - val attributes = Array.fill[Attributes](ops.length)(Attributes.none) + val attributes = Array.fill[Attributes](ops.length)(ActorAttributes.supervisionStrategy(decider)) val ins = Array.ofDim[Inlet[_]](ops.length + 1) val inOwners = Array.ofDim[Int](ops.length + 1) val outs = Array.ofDim[Outlet[_]](ops.length + 1) @@ -429,4 +431,5 @@ trait GraphInterpreterSpecKit extends StreamSpec { } + abstract class OneBoundedSetup[T](_ops: GraphStageWithMaterializedValue[Shape, Any]*) extends OneBoundedSetupWithDecider[T](Supervision.stoppingDecider, _ops: _*) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala index 355c6453b7..0ffdb7b440 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -19,12 +19,6 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit override def toString = "TE" } - class ResumingMap[In, Out](_f: In ⇒ Out) extends Map(_f) { - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - super.createLogic(inheritedAttributes.and(ActorAttributes.supervisionStrategy(resumingDecider))) - } - "Interpreter error handling" must { "handle external failure" in new OneBoundedSetup[Int](Map((x: Int) ⇒ x + 1)) { @@ -62,8 +56,9 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(Cancel, OnError(TE))) } - "resume when Map throws" in new OneBoundedSetup[Int]( - new ResumingMap((x: Int) ⇒ if (x == 0) throw TE else x) + "resume when Map throws" in new OneBoundedSetupWithDecider[Int]( + Supervision.resumingDecider, + Map((x: Int) ⇒ if (x == 0) throw TE else x) ) { downstream.requestOne() lastEvents() should be(Set(RequestOne)) @@ -88,10 +83,11 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(OnNext(4))) } - "resume when Map throws in middle of the chain" in new OneBoundedSetup[Int]( - new ResumingMap((x: Int) ⇒ x + 1), - new ResumingMap((x: Int) ⇒ if (x == 0) throw TE else x + 10), - new ResumingMap((x: Int) ⇒ x + 100) + "resume when Map throws in middle of the chain" in new OneBoundedSetupWithDecider[Int]( + Supervision.resumingDecider, + Map((x: Int) ⇒ x + 1), + Map((x: Int) ⇒ if (x == 0) throw TE else x + 10), + Map((x: Int) ⇒ x + 100) ) { downstream.requestOne() @@ -108,9 +104,10 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(OnNext(114))) } - "resume when Map throws before Grouped" in new OneBoundedSetup[Int]( - new ResumingMap((x: Int) ⇒ x + 1), - new ResumingMap((x: Int) ⇒ if (x <= 0) throw TE else x + 10), + "resume when Map throws before Grouped" in new OneBoundedSetupWithDecider[Int]( + Supervision.resumingDecider, + Map((x: Int) ⇒ x + 1), + Map((x: Int) ⇒ if (x <= 0) throw TE else x + 10), Grouped(3)) { downstream.requestOne() @@ -128,9 +125,10 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(OnNext(Vector(13, 14, 15)))) } - "complete after resume when Map throws before Grouped" in new OneBoundedSetup[Int]( - new ResumingMap((x: Int) ⇒ x + 1), - new ResumingMap((x: Int) ⇒ if (x <= 0) throw TE else x + 10), + "complete after resume when Map throws before Grouped" in new OneBoundedSetupWithDecider[Int]( + Supervision.resumingDecider, + Map((x: Int) ⇒ x + 1), + Map((x: Int) ⇒ if (x <= 0) throw TE else x + 10), Grouped(1000)) { downstream.requestOne() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d1dbdb544a..96c7ec907c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -25,8 +25,7 @@ import akka.stream.impl.Stages.DefaultAttributes /** * INTERNAL API */ -// FIXME: Not final because InterpreterSupervisionSpec. Some better option is needed here -case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Out]] { +final case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("Map.in") val out = Outlet[Out]("Map.out") override val shape = FlowShape(in, out)