Make map stage final and refactor OneBoundedSetup to take a decider. (#21374)

* Remove new from Map constructions
This commit is contained in:
Michał Płachta 2016-09-08 11:47:17 +02:00 committed by Patrik Nordwall
parent fc3761bc4b
commit b3bba1229f
3 changed files with 22 additions and 22 deletions

View file

@ -4,6 +4,8 @@
package akka.stream.impl.fusing
import akka.event.Logging
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
import akka.stream._
import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, Failed, GraphAssembly, UpstreamBoundaryStageLogic }
import akka.stream.stage.AbstractStage.PushPullGraphStage
@ -307,7 +309,7 @@ trait GraphInterpreterSpecKit extends StreamSpec {
}
}
abstract class OneBoundedSetup[T](_ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder {
abstract class OneBoundedSetupWithDecider[T](decider: Decider, _ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder {
val ops = _ops.toArray
val upstream = new UpstreamOneBoundedProbe[T]
@ -329,7 +331,7 @@ trait GraphInterpreterSpecKit extends StreamSpec {
import GraphInterpreter.Boundary
var i = 0
val attributes = Array.fill[Attributes](ops.length)(Attributes.none)
val attributes = Array.fill[Attributes](ops.length)(ActorAttributes.supervisionStrategy(decider))
val ins = Array.ofDim[Inlet[_]](ops.length + 1)
val inOwners = Array.ofDim[Int](ops.length + 1)
val outs = Array.ofDim[Outlet[_]](ops.length + 1)
@ -429,4 +431,5 @@ trait GraphInterpreterSpecKit extends StreamSpec {
}
abstract class OneBoundedSetup[T](_ops: GraphStageWithMaterializedValue[Shape, Any]*) extends OneBoundedSetupWithDecider[T](Supervision.stoppingDecider, _ops: _*)
}

View file

@ -19,12 +19,6 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
override def toString = "TE"
}
class ResumingMap[In, Out](_f: In Out) extends Map(_f) {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
super.createLogic(inheritedAttributes.and(ActorAttributes.supervisionStrategy(resumingDecider)))
}
"Interpreter error handling" must {
"handle external failure" in new OneBoundedSetup[Int](Map((x: Int) x + 1)) {
@ -62,8 +56,9 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
lastEvents() should be(Set(Cancel, OnError(TE)))
}
"resume when Map throws" in new OneBoundedSetup[Int](
new ResumingMap((x: Int) if (x == 0) throw TE else x)
"resume when Map throws" in new OneBoundedSetupWithDecider[Int](
Supervision.resumingDecider,
Map((x: Int) if (x == 0) throw TE else x)
) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
@ -88,10 +83,11 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
lastEvents() should be(Set(OnNext(4)))
}
"resume when Map throws in middle of the chain" in new OneBoundedSetup[Int](
new ResumingMap((x: Int) x + 1),
new ResumingMap((x: Int) if (x == 0) throw TE else x + 10),
new ResumingMap((x: Int) x + 100)
"resume when Map throws in middle of the chain" in new OneBoundedSetupWithDecider[Int](
Supervision.resumingDecider,
Map((x: Int) x + 1),
Map((x: Int) if (x == 0) throw TE else x + 10),
Map((x: Int) x + 100)
) {
downstream.requestOne()
@ -108,9 +104,10 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
lastEvents() should be(Set(OnNext(114)))
}
"resume when Map throws before Grouped" in new OneBoundedSetup[Int](
new ResumingMap((x: Int) x + 1),
new ResumingMap((x: Int) if (x <= 0) throw TE else x + 10),
"resume when Map throws before Grouped" in new OneBoundedSetupWithDecider[Int](
Supervision.resumingDecider,
Map((x: Int) x + 1),
Map((x: Int) if (x <= 0) throw TE else x + 10),
Grouped(3)) {
downstream.requestOne()
@ -128,9 +125,10 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
lastEvents() should be(Set(OnNext(Vector(13, 14, 15))))
}
"complete after resume when Map throws before Grouped" in new OneBoundedSetup[Int](
new ResumingMap((x: Int) x + 1),
new ResumingMap((x: Int) if (x <= 0) throw TE else x + 10),
"complete after resume when Map throws before Grouped" in new OneBoundedSetupWithDecider[Int](
Supervision.resumingDecider,
Map((x: Int) x + 1),
Map((x: Int) if (x <= 0) throw TE else x + 10),
Grouped(1000)) {
downstream.requestOne()

View file

@ -25,8 +25,7 @@ import akka.stream.impl.Stages.DefaultAttributes
/**
* INTERNAL API
*/
// FIXME: Not final because InterpreterSupervisionSpec. Some better option is needed here
case class Map[In, Out](f: In Out) extends GraphStage[FlowShape[In, Out]] {
final case class Map[In, Out](f: In Out) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Map.in")
val out = Outlet[Out]("Map.out")
override val shape = FlowShape(in, out)