migrating Doubler, KeepGoing to GraphStage

This commit is contained in:
zhxiaog 2016-04-14 00:37:15 +08:00
parent a1423b6e7d
commit 16ac832b14
2 changed files with 64 additions and 36 deletions

View file

@ -8,7 +8,7 @@ import akka.stream.stage._
import akka.testkit.AkkaSpec
import akka.testkit.EventFilter
import akka.stream.Supervision
import akka.stream._
class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
import Supervision.stoppingDecider
@ -80,7 +80,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
}
"implement one-to-many many-to-one chain correctly" in new OneBoundedSetup[Int](
Doubler().toGS,
Doubler(),
Filter((x: Int) x != 0)) {
lastEvents() should be(Set.empty)
@ -106,7 +106,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
"implement many-to-one one-to-many chain correctly" in new OneBoundedSetup[Int](
Filter((x: Int) x != 0),
Doubler().toGS) {
Doubler()) {
lastEvents() should be(Set.empty)
@ -412,7 +412,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
}
"implement doubler-conflate (doubler-batch)" in new OneBoundedSetup[Int](
Doubler().toGS,
Doubler(),
Batch(
1L,
ConstantFun.zeroLong,
@ -432,12 +432,12 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
}
// Note, the new interpreter has no jumpback table, still did not want to remove the test
"work with jumpback table and completed elements" in new OneBoundedSetup[Int](Seq(
Map((x: Int) x, stoppingDecider),
Map((x: Int) x, stoppingDecider),
"work with jumpback table and completed elements" in new OneBoundedSetup[Int](
Map((x: Int) x, stoppingDecider).toGS,
Map((x: Int) x, stoppingDecider).toGS,
KeepGoing(),
Map((x: Int) x, stoppingDecider),
Map((x: Int) x, stoppingDecider))) {
Map((x: Int) x, stoppingDecider).toGS,
Map((x: Int) x, stoppingDecider).toGS) {
lastEvents() should be(Set.empty)
@ -569,39 +569,67 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
}
private[akka] case class Doubler[T]() extends PushPullStage[T, T] {
var oneMore: Boolean = false
var lastElem: T = _
private[akka] final case class Doubler[T]() extends GraphStage[FlowShape[T, T]] {
val out: Outlet[T] = Outlet("Doubler.out")
val in: Inlet[T] = Inlet("Doubler.in")
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
lastElem = elem
oneMore = true
ctx.push(elem)
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
var latest: T = _
var oneMore = false
override def onPull(ctx: Context[T]): SyncDirective = {
if (oneMore) {
oneMore = false
ctx.push(lastElem)
} else ctx.pull()
}
override def onPush(): Unit = {
latest = grab(in)
oneMore = true
push(out, latest)
}
/**
* Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
* is now allowed to be called on this port.
*/
override def onPull(): Unit = {
if (oneMore) {
push(out, latest)
oneMore = false
} else {
pull(in)
}
}
setHandlers(in, out, this)
}
override val shape: FlowShape[T, T] = FlowShape(in, out)
}
private[akka] case class KeepGoing[T]() extends PushPullStage[T, T] {
var lastElem: T = _
private[akka] final case class KeepGoing[T]() extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("KeepGoing.in")
val out = Outlet[T]("KeepGoing.out")
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
lastElem = elem
ctx.push(elem)
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
var lastElem: T = _
override def onPull(ctx: Context[T]): SyncDirective = {
if (ctx.isFinishing) {
ctx.push(lastElem)
} else ctx.pull()
}
override def onPush(): Unit = {
lastElem = grab(in)
push(out, lastElem)
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = ctx.absorbTermination()
override def onPull(): Unit = {
if (isClosed(in)) {
push(out, lastElem)
} else {
pull(in)
}
}
override def onUpstreamFinish(): Unit = {}
setHandlers(in, out, this)
}
override val shape: FlowShape[T, T] = FlowShape(in, out)
}
// This test is related to issue #17351