Fix various interpreter issues, add extensive port transition tests

This commit is contained in:
Endre Sándor Varga 2015-09-24 15:32:06 +02:00
parent ae83053a64
commit ea03bb315d
7 changed files with 1111 additions and 225 deletions

View file

@ -1,7 +1,7 @@
package akka.stream
import akka.event._
import akka.stream.impl.fusing.{ InterpreterSpecKit, GraphInterpreterSpec, GraphStages, Map => MapStage, OneBoundedInterpreter }
import akka.stream.impl.fusing.{ GraphInterpreterSpecKit, GraphStages, Map => MapStage, OneBoundedInterpreter }
import akka.stream.impl.fusing.GraphStages.Identity
import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
import akka.stream.stage._
@ -26,34 +26,33 @@ class InterpreterBenchmark {
@Benchmark
@OperationsPerInvocation(100000)
def graph_interpreter_100k_elements() {
val lock = new Lock()
lock.acquire()
new GraphInterpreterSpec.TestSetup() {
val identities = Vector.fill(numberOfIds)(new Identity[Int])
val source = new GraphDataSource("source", data100k)
val sink = new GraphDataSink[Int]("sink", data100k.size, lock)
new GraphInterpreterSpecKit {
new TestSetup {
val identities = Vector.fill(numberOfIds)(new Identity[Int])
val source = new GraphDataSource("source", data100k)
val sink = new GraphDataSink[Int]("sink", data100k.size)
val b = builder(identities:_*)
.connect(source, identities.head.in)
.connect(identities.last.out, sink)
val b = builder(identities: _*)
.connect(source, identities.head.in)
.connect(identities.last.out, sink)
for (i <- (0 until identities.size - 1)) {
b.connect(identities(i).out, identities(i + 1).in)
// FIXME: This should not be here, this is pure setup overhead
for (i <- (0 until identities.size - 1)) {
b.connect(identities(i).out, identities(i + 1).in)
}
b.init()
sink.requestOne()
interpreter.execute(Int.MaxValue)
}
b.init()
sink.requestOne()
interpreter.execute(Int.MaxValue)
}
lock.acquire()
}
@Benchmark
@OperationsPerInvocation(100000)
def onebounded_interpreter_100k_elements() {
val lock = new Lock()
lock.acquire()
val sink = OneBoundedDataSink(data100k.size, lock)
val sink = OneBoundedDataSink(data100k.size)
// FIXME: This should not be here, this is pure setup overhead
val ops = Vector.fill(numberOfIds)(MapStage(identity[Int], Supervision.stoppingDecider))
val interpreter = new OneBoundedInterpreter(OneBoundedDataSource(data100k) +: ops :+ sink,
(op, ctx, event) (),
@ -63,7 +62,6 @@ class InterpreterBenchmark {
forkLimit = 100, overflowToHeap = false)
interpreter.init()
sink.requestOne()
lock.acquire()
}
}
@ -87,16 +85,14 @@ object InterpreterBenchmark {
})
}
case class GraphDataSink[T](override val toString: String, var expected: Int, completionLock: Lock) extends DownstreamBoundaryStageLogic[T] {
case class GraphDataSink[T](override val toString: String, var expected: Int) extends DownstreamBoundaryStageLogic[T] {
val in = Inlet[T]("in")
setHandler(in, new InHandler {
override def onPush(): Unit = {
expected -= 1
pull(in)
if (expected == 0) {
completionLock.release()
}
if (expected > 0) pull(in)
// Otherwise do nothing, it will exit the interpreter
}
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
@ -126,13 +122,11 @@ object InterpreterBenchmark {
throw new UnsupportedOperationException("Cannot push the boundary")
}
case class OneBoundedDataSink(var expected: Int, completionLock: Lock) extends BoundaryStage {
case class OneBoundedDataSink(var expected: Int) extends BoundaryStage {
override def onPush(elem: Any, ctx: BoundaryContext): Directive = {
expected -= 1
if (expected == 0) {
completionLock.release()
}
ctx.pull()
if (expected == 0) ctx.exit()
else ctx.pull()
}
override def onUpstreamFinish(ctx: BoundaryContext): TerminationDirective = {

View file

@ -0,0 +1,759 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.fusing
import akka.stream.testkit.AkkaSpec
class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit {
"Port states" must {
"properly transition on push and pull" in new PortTestSetup {
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.pull()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
stepAll()
lastEvents() should be(Set(RequestOne(out)))
out.isAvailable should be(true)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.push(0)
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
stepAll()
lastEvents() should be(Set(OnNext(in, 0)))
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(true)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
in.grab() should ===(0)
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
// Cycle completed
}
"drop ungrabbed element on pull" in new PortTestSetup {
in.pull()
step()
clearEvents()
out.push(0)
step()
lastEvents() should be(Set(OnNext(in, 0)))
in.pull()
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate complete while downstream is active" in new PortTestSetup {
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
out.complete()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
stepAll()
lastEvents() should be(Set(OnComplete(in)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.cancel() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate complete while upstream is active" in new PortTestSetup {
in.pull()
stepAll()
lastEvents() should be(Set(RequestOne(out)))
out.isAvailable should be(true)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
out.complete()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
stepAll()
lastEvents() should be(Set(OnComplete(in)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.cancel() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate complete while pull is in flight" in new PortTestSetup {
in.pull()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
out.complete()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
stepAll()
lastEvents() should be(Set(OnComplete(in)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.cancel() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate complete while push is in flight" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
out.complete()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
step()
lastEvents() should be(Set(OnNext(in, 0)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(true)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
in.grab() should ===(0)
an[IllegalArgumentException] should be thrownBy { in.grab() }
step()
lastEvents() should be(Set(OnComplete(in)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate complete while push is in flight and keep ungrabbed element" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
out.complete()
step()
lastEvents() should be(Set(OnNext(in, 0)))
step()
lastEvents() should be(Set(OnComplete(in)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(true)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
in.grab() should ===(0)
}
"ignore pull while completing" in new PortTestSetup {
out.complete()
in.pull()
stepAll()
lastEvents() should be(Set(OnComplete(in)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate cancel while downstream is active" in new PortTestSetup {
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
in.cancel()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { in.grab() }
stepAll()
lastEvents() should be(Set(Cancel(out)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.cancel() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate cancel while upstream is active" in new PortTestSetup {
in.pull()
stepAll()
lastEvents() should be(Set(RequestOne(out)))
out.isAvailable should be(true)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
in.cancel()
lastEvents() should be(Set.empty)
out.isAvailable should be(true)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { in.grab() }
stepAll()
lastEvents() should be(Set(Cancel(out)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.cancel() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate cancel while pull is in flight" in new PortTestSetup {
in.pull()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
in.cancel()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
stepAll()
lastEvents() should be(Set(Cancel(out)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.cancel() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate cancel while push is in flight" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
in.cancel()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
stepAll()
lastEvents() should be(Set(Cancel(out)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.cancel() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"ignore push while cancelling" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
in.cancel()
out.push(0)
stepAll()
lastEvents() should be(Set(Cancel(out)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"clear ungrabbed element even when cancelled" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
stepAll()
lastEvents() should be(Set(OnNext(in, 0)))
in.cancel()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { in.grab() }
stepAll()
lastEvents() should be(Set(Cancel(out)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"ignore any completion if they are concurrent (cancel first)" in new PortTestSetup {
in.cancel()
out.complete()
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"ignore any completion if they are concurrent (complete first)" in new PortTestSetup {
out.complete()
in.cancel()
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"ignore completion from a push-complete if cancelled while in flight" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
out.complete()
in.cancel()
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"ignore completion from a push-complete if cancelled after onPush" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
out.complete()
step()
lastEvents() should be(Set(OnNext(in, 0)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(true)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
in.grab() should ===(0)
in.cancel()
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
}
}

View file

@ -3,16 +3,10 @@
*/
package akka.stream.impl.fusing
import akka.stream._
import akka.stream.impl.fusing.GraphInterpreterSpec.TestSetup
import akka.stream.stage.{ InHandler, OutHandler, GraphStage, GraphStageLogic }
import akka.stream.testkit.AkkaSpec
import GraphInterpreter._
import scala.collection.immutable
class GraphInterpreterSpec extends AkkaSpec {
import GraphInterpreterSpec._
class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
import GraphStages._
"GraphInterpreter" must {
@ -26,8 +20,8 @@ class GraphInterpreterSpec extends AkkaSpec {
val balance = new Balance[Int](2)
"implement identity" in new TestSetup {
val source = UpstreamProbe[Int]("source")
val sink = DownstreamProbe[Int]("sink")
val source = new UpstreamProbe[Int]("source")
val sink = new DownstreamProbe[Int]("sink")
builder(identity)
.connect(source, identity.in)
@ -70,8 +64,8 @@ class GraphInterpreterSpec extends AkkaSpec {
}
"implement detacher stage" in new TestSetup {
val source = UpstreamProbe[Int]("source")
val sink = DownstreamProbe[Int]("sink")
val source = new UpstreamProbe[Int]("source")
val sink = new DownstreamProbe[Int]("sink")
builder(detacher)
.connect(source, detacher.in)
@ -349,113 +343,3 @@ class GraphInterpreterSpec extends AkkaSpec {
}
}
object GraphInterpreterSpec {
sealed trait TestEvent {
def source: GraphStageLogic
}
case class OnComplete(source: GraphStageLogic) extends TestEvent
case class Cancel(source: GraphStageLogic) extends TestEvent
case class OnError(source: GraphStageLogic, cause: Throwable) extends TestEvent
case class OnNext(source: GraphStageLogic, elem: Any) extends TestEvent
case class RequestOne(source: GraphStageLogic) extends TestEvent
case class RequestAnother(source: GraphStageLogic) extends TestEvent
abstract class TestSetup {
private var lastEvent: Set[TestEvent] = Set.empty
private var _interpreter: GraphInterpreter = _
protected def interpreter: GraphInterpreter = _interpreter
class AssemblyBuilder(stages: Seq[GraphStage[_ <: Shape]]) {
var upstreams = Vector.empty[(UpstreamBoundaryStageLogic[_], Inlet[_])]
var downstreams = Vector.empty[(Outlet[_], DownstreamBoundaryStageLogic[_])]
var connections = Vector.empty[(Outlet[_], Inlet[_])]
def connect[T](upstream: UpstreamBoundaryStageLogic[T], in: Inlet[T]): AssemblyBuilder = {
upstreams :+= upstream -> in
this
}
def connect[T](out: Outlet[T], downstream: DownstreamBoundaryStageLogic[T]): AssemblyBuilder = {
downstreams :+= out -> downstream
this
}
def connect[T](out: Outlet[T], in: Inlet[T]): AssemblyBuilder = {
connections :+= out -> in
this
}
def init(): Unit = {
val ins = upstreams.map(_._2) ++ connections.map(_._2)
val outs = connections.map(_._1) ++ downstreams.map(_._1)
val inOwners = ins.map { in stages.indexWhere(_.shape.inlets.contains(in)) }
val outOwners = outs.map { out stages.indexWhere(_.shape.outlets.contains(out)) }
val assembly = GraphAssembly(
stages.toArray,
(ins ++ Vector.fill(downstreams.size)(null)).toArray,
(inOwners ++ Vector.fill(downstreams.size)(-1)).toArray,
(Vector.fill(upstreams.size)(null) ++ outs).toArray,
(Vector.fill(upstreams.size)(-1) ++ outOwners).toArray)
val (inHandlers, outHandlers, logics, _) = assembly.materialize()
_interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ())
for ((upstream, i) upstreams.zipWithIndex) {
_interpreter.attachUpstreamBoundary(i, upstream._1)
}
for ((downstream, i) downstreams.zipWithIndex) {
_interpreter.attachDownstreamBoundary(i + upstreams.size + connections.size, downstream._2)
}
_interpreter.init()
}
}
def manualInit(assembly: GraphAssembly): Unit = {
val (inHandlers, outHandlers, logics, _) = assembly.materialize()
_interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ())
}
def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages.toSeq)
def lastEvents(): Set[TestEvent] = {
val result = lastEvent
lastEvent = Set.empty
result
}
case class UpstreamProbe[T](override val toString: String) extends UpstreamBoundaryStageLogic[T] {
val out = Outlet[T]("out")
setHandler(out, new OutHandler {
override def onPull(): Unit = lastEvent += RequestOne(UpstreamProbe.this)
})
def onNext(elem: T, eventLimit: Int = Int.MaxValue): Unit = {
if (GraphInterpreter.Debug) println(s"----- NEXT: $this $elem")
push(out, elem)
interpreter.execute(eventLimit)
}
}
case class DownstreamProbe[T](override val toString: String) extends DownstreamBoundaryStageLogic[T] {
val in = Inlet[T]("in")
setHandler(in, new InHandler {
override def onPush(): Unit = lastEvent += OnNext(DownstreamProbe.this, grab(in))
})
def requestOne(eventLimit: Int = Int.MaxValue): Unit = {
if (GraphInterpreter.Debug) println(s"----- REQ $this")
pull(in)
interpreter.execute(eventLimit)
}
}
}
}

View file

@ -0,0 +1,176 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.fusing
import akka.stream.{ NoMaterializer, Outlet, Inlet, Shape }
import akka.stream.impl.fusing.GraphInterpreter.{ GraphAssembly, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
import akka.stream.stage.{ InHandler, OutHandler, GraphStage, GraphStageLogic }
trait GraphInterpreterSpecKit {
sealed trait TestEvent {
def source: GraphStageLogic
}
case class OnComplete(source: GraphStageLogic) extends TestEvent
case class Cancel(source: GraphStageLogic) extends TestEvent
case class OnError(source: GraphStageLogic, cause: Throwable) extends TestEvent
case class OnNext(source: GraphStageLogic, elem: Any) extends TestEvent
case class RequestOne(source: GraphStageLogic) extends TestEvent
case class RequestAnother(source: GraphStageLogic) extends TestEvent
abstract class TestSetup {
protected var lastEvent: Set[TestEvent] = Set.empty
private var _interpreter: GraphInterpreter = _
protected def interpreter: GraphInterpreter = _interpreter
class AssemblyBuilder(stages: Seq[GraphStage[_ <: Shape]]) {
var upstreams = Vector.empty[(UpstreamBoundaryStageLogic[_], Inlet[_])]
var downstreams = Vector.empty[(Outlet[_], DownstreamBoundaryStageLogic[_])]
var connections = Vector.empty[(Outlet[_], Inlet[_])]
def connect[T](upstream: UpstreamBoundaryStageLogic[T], in: Inlet[T]): AssemblyBuilder = {
upstreams :+= upstream -> in
this
}
def connect[T](out: Outlet[T], downstream: DownstreamBoundaryStageLogic[T]): AssemblyBuilder = {
downstreams :+= out -> downstream
this
}
def connect[T](out: Outlet[T], in: Inlet[T]): AssemblyBuilder = {
connections :+= out -> in
this
}
def init(): Unit = {
val ins = upstreams.map(_._2) ++ connections.map(_._2)
val outs = connections.map(_._1) ++ downstreams.map(_._1)
val inOwners = ins.map { in stages.indexWhere(_.shape.inlets.contains(in)) }
val outOwners = outs.map { out stages.indexWhere(_.shape.outlets.contains(out)) }
val assembly = GraphAssembly(
stages.toArray,
(ins ++ Vector.fill(downstreams.size)(null)).toArray,
(inOwners ++ Vector.fill(downstreams.size)(-1)).toArray,
(Vector.fill(upstreams.size)(null) ++ outs).toArray,
(Vector.fill(upstreams.size)(-1) ++ outOwners).toArray)
val (inHandlers, outHandlers, logics, _) = assembly.materialize()
_interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ())
for ((upstream, i) upstreams.zipWithIndex) {
_interpreter.attachUpstreamBoundary(i, upstream._1)
}
for ((downstream, i) downstreams.zipWithIndex) {
_interpreter.attachDownstreamBoundary(i + upstreams.size + connections.size, downstream._2)
}
_interpreter.init()
}
}
def manualInit(assembly: GraphAssembly): Unit = {
val (inHandlers, outHandlers, logics, _) = assembly.materialize()
_interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ())
}
def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages.toSeq)
def lastEvents(): Set[TestEvent] = {
val result = lastEvent
clearEvents()
result
}
def clearEvents(): Unit = lastEvent = Set.empty
class UpstreamProbe[T](override val toString: String) extends UpstreamBoundaryStageLogic[T] {
val out = Outlet[T]("out")
setHandler(out, new OutHandler {
override def onPull(): Unit = lastEvent += RequestOne(UpstreamProbe.this)
override def onDownstreamFinish() = lastEvent += Cancel(UpstreamProbe.this)
})
def onNext(elem: T, eventLimit: Int = Int.MaxValue): Unit = {
if (GraphInterpreter.Debug) println(s"----- NEXT: $this $elem")
push(out, elem)
interpreter.execute(eventLimit)
}
}
class DownstreamProbe[T](override val toString: String) extends DownstreamBoundaryStageLogic[T] {
val in = Inlet[T]("in")
setHandler(in, new InHandler {
override def onPush(): Unit = lastEvent += OnNext(DownstreamProbe.this, grab(in))
override def onUpstreamFinish() = lastEvent += OnComplete(DownstreamProbe.this)
override def onUpstreamFailure(ex: Throwable) = OnError(DownstreamProbe.this, ex)
})
def requestOne(eventLimit: Int = Int.MaxValue): Unit = {
if (GraphInterpreter.Debug) println(s"----- REQ $this")
pull(in)
interpreter.execute(eventLimit)
}
}
}
abstract class PortTestSetup extends TestSetup {
val out = new UpstreamPortProbe[Int]
val in = new DownstreamPortProbe[Int]
class UpstreamPortProbe[T] extends UpstreamProbe[T]("upstreamPort") {
def isAvailable: Boolean = isAvailable(out)
def isClosed: Boolean = isClosed(out)
def push(elem: T): Unit = push(out, elem)
def complete(): Unit = complete(out)
def fail(ex: Throwable): Unit = fail(out, ex)
}
class DownstreamPortProbe[T] extends DownstreamProbe[T]("upstreamPort") {
def isAvailable: Boolean = isAvailable(in)
def hasBeenPulled: Boolean = hasBeenPulled(in)
def isClosed: Boolean = isClosed(in)
def pull(): Unit = pull(in)
def cancel(): Unit = cancel(in)
def grab(): T = grab(in)
setHandler(in, new InHandler {
// Modified onPush that does not grab() automatically the element. This accesses some internals.
override def onPush(): Unit =
lastEvent +=
OnNext(
DownstreamPortProbe.this,
interpreter.connectionStates(inToConn(in)))
override def onUpstreamFinish() = lastEvent += OnComplete(DownstreamPortProbe.this)
override def onUpstreamFailure(ex: Throwable) = OnError(DownstreamPortProbe.this, ex)
})
}
def stepAll(): Unit = interpreter.execute(eventLimit = Int.MaxValue)
def step(): Unit = interpreter.execute(eventLimit = 1)
private val assembly = GraphAssembly(
stages = Array.empty,
ins = Array(null),
inOwners = Array(-1),
outs = Array(null),
outOwners = Array(-1))
manualInit(assembly)
interpreter.attachDownstreamBoundary(0, in)
interpreter.attachUpstreamBoundary(0, out)
interpreter.init()
}
}

View file

@ -3,6 +3,8 @@
*/
package akka.stream.impl.fusing
import java.util.concurrent.TimeoutException
import akka.actor._
import akka.event.Logging
import akka.stream._
@ -120,6 +122,7 @@ private[stream] object ActorGraphInterpreter {
}
def cancel(): Unit = {
downstreamCanceled = true
if (!upstreamCompleted) {
upstreamCompleted = true
if (upstream ne null) tryCancel(upstream)
@ -137,7 +140,7 @@ private[stream] object ActorGraphInterpreter {
}
def onError(e: Throwable): Unit =
if (!upstreamCompleted) {
if (!upstreamCompleted || !downstreamCanceled) {
upstreamCompleted = true
clear()
fail(out, e)
@ -200,8 +203,6 @@ private[stream] object ActorGraphInterpreter {
// This flag is only used if complete/fail is called externally since this op turns into a Finished one inside the
// interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked)
private var downstreamCompleted = false
// this is true while we hold the ball; while false incoming demand will just be queued up
private var upstreamWaiting = true
// when upstream failed before we got the exposed publisher
private var upstreamFailed: Option[Throwable] = None
@ -267,7 +268,7 @@ private[stream] object ActorGraphInterpreter {
downstreamDemand += elements
if (downstreamDemand < 0)
downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
if (!hasBeenPulled(in)) pull(in)
if (!hasBeenPulled(in) && !isClosed(in)) pull(in)
}
}
@ -303,16 +304,19 @@ private[stream] class ActorGraphInterpreter(
logics,
(logic, event, handler) self ! AsyncInput(logic, event, handler))
val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _))
val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _))
private val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _))
private val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _))
private var subscribesPending = inputs.length
// Limits the number of events processed by the interpreter before scheduling a self-message for fairness with other
// actors.
// TODO: Better heuristic here
val eventLimit = settings.maxInputBufferSize * assembly.stages.length * 4 // Roughly 4 events per element transfer
// TODO: Better heuristic here (take into account buffer size, connection count, 4 events per element, have a max)
val eventLimit = settings.maxInputBufferSize * (inputs.length + outputs.length) * 2
// Limits the number of events processed by the interpreter on an abort event.
// TODO: Better heuristic here
val abortLimit = eventLimit * 2
var resumeScheduled = false
private val abortLimit = eventLimit * 2
private var resumeScheduled = false
override def preStart(): Unit = {
var i = 0
@ -350,7 +354,6 @@ private[stream] class ActorGraphInterpreter(
case NonFatal(e) logic.failStage(e)
}
}
runBatch()
// Initialization and completion messages
@ -363,6 +366,7 @@ private[stream] class ActorGraphInterpreter(
inputs(id).onComplete()
runBatch()
case OnSubscribe(id: Int, subscription: Subscription)
subscribesPending -= 1
inputs(id).onSubscribe(subscription)
case Cancel(id: Int)
if (GraphInterpreter.Debug) println(s" cancel id=$id")
@ -375,16 +379,28 @@ private[stream] class ActorGraphInterpreter(
}
override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
super.aroundReceive(receive, msg)
private def waitShutdown: Receive = {
case OnSubscribe(_, sub)
tryCancel(sub)
subscribesPending -= 1
if (subscribesPending == 0) context.stop(self)
case ReceiveTimeout
tryAbort(new TimeoutException("Streaming actor has been already stopped processing (normally), but not all of its " +
s"inputs have been subscribed in [${settings.subscriptionTimeoutSettings.timeout}}]. Aborting actor now."))
case _ // Ignore, there is nothing to do anyway
}
private def runBatch(): Unit = {
try {
interpreter.execute(eventLimit)
if (interpreter.isCompleted) context.stop(self)
else if (interpreter.isSuspended && !resumeScheduled) {
if (interpreter.isCompleted) {
// Cannot stop right away if not completely subscribed
if (subscribesPending == 0) context.stop(self)
else {
context.become(waitShutdown)
context.setReceiveTimeout(settings.subscriptionTimeoutSettings.timeout)
}
} else if (interpreter.isSuspended && !resumeScheduled) {
resumeScheduled = true
self ! Resume
}

View file

@ -26,16 +26,25 @@ private[stream] object GraphInterpreter {
case object Empty
sealed trait ConnectionState
sealed trait CompletedState extends ConnectionState
case object Pushable extends ConnectionState
case object Completed extends CompletedState
final case class PushCompleted(element: Any) extends ConnectionState
case object Cancelled extends CompletedState
final case class Failed(ex: Throwable) extends CompletedState
case object Pulled extends ConnectionState
sealed trait HasElementState
sealed trait CompletingState extends ConnectionState
final case class CompletedHasElement(element: Any) extends CompletingState with HasElementState
final case class PushCompleted(element: Any) extends CompletingState with HasElementState
case object Completed extends CompletingState
case object Cancelled extends CompletingState
final case class Failed(ex: Throwable) extends CompletingState
val NoEvent = -1
val Boundary = -1
sealed trait PortState
case object InFlight extends PortState
case object Available extends PortState
case object Closed extends PortState
abstract class UpstreamBoundaryStageLogic[T] extends GraphStageLogic {
def out: Outlet[T]
}
@ -160,10 +169,11 @@ private[stream] object GraphInterpreter {
* while in the practical sense a connection is a number which represents slots in certain arrays.
* In particular
* - connectionStates is a mapping from a connection id to a current (or future) state of the connection
* - inAvailable is a mapping from a connection to a boolean that indicates whether the input corresponding
* to the connection is currently pullable
* - outAvailable is a mapping from a connection to a boolean that indicates whether the input corresponding
* to the connection is currently pushable
* - inStates is a mapping from a connection to a [[akka.stream.impl.fusing.GraphInterpreter.PortState]]
* that indicates whether the input corresponding
* to the connection is currently pullable or completed
* - outStates is a mapping from a connection to a [[akka.stream.impl.fusing.GraphInterpreter.PortState]]
* that indicates whether the input corresponding to the connection is currently pushable or completed
* - inHandlers is a mapping from a connection id to the [[InHandler]] instance that handles the events corresponding
* to the input port of the connection
* - outHandlers is a mapping from a connection id to the [[OutHandler]] instance that handles the events corresponding
@ -177,14 +187,14 @@ private[stream] object GraphInterpreter {
*
* Sending an event is usually the following sequence:
* - An action is requested by a stage logic (push, pull, complete, etc.)
* - the availability of the port is set on the sender side to false (inAvailable or outAvailable)
* - the availability of the port is set on the sender side to Limbo (inStates or outStates)
* - the scheduled event is put in the slot of the connection in the connectionStates table
* - the id of the affected connection is enqueued
*
* Receiving an event is usually the following sequence:
* - id of connection to be processed is dequeued
* - the type of the event is determined by the object in the corresponding connectionStates slot
* - the availability of the port is set on the receiver side to be true (inAvailable or outAvailable)
* - the availability of the port is set on the receiver side to be Available (inStates or outStates)
* - using the inHandlers/outHandlers table the corresponding callback is called on the stage logic.
*
* Because of the FIFO construction of the queue the interpreter is fair, i.e. a pending event is always executed
@ -203,19 +213,19 @@ private[stream] final class GraphInterpreter(
// Maintains the next event (and state) of the connection.
// Technically the connection cannot be considered being in the state that is encoded here before the enqueued
// connection event has been processed. The inAvailable and outAvailable arrays usually protect access to this
// connection event has been processed. The inStates and outStates arrays usually protect access to this
// field while it is in transient state.
val connectionStates = Array.fill[Any](assembly.connectionCount)(Empty)
// Indicates whether the input port is pullable. After pulling it becomes false
// Be aware that when inAvailable goes to false outAvailable does not become true immediately, only after
// the corresponding event in the queue has been processed
val inAvailable = Array.fill[Boolean](assembly.connectionCount)(true)
val inStates = Array.fill[PortState](assembly.connectionCount)(Available)
// Indicates whether the output port is pushable. After pushing it becomes false
// Be aware that when inAvailable goes to false outAvailable does not become true immediately, only after
// the corresponding event in the queue has been processed
val outAvailable = Array.fill[Boolean](assembly.connectionCount)(false)
val outStates = Array.fill[PortState](assembly.connectionCount)(InFlight)
// The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be
// completed
@ -228,8 +238,8 @@ private[stream] final class GraphInterpreter(
}
// An event queue implemented as a circular buffer
private val mask = 255
private val eventQueue = Array.ofDim[Int](256)
private val mask = eventQueue.length - 1
private var queueHead: Int = 0
private var queueTail: Int = 0
@ -306,6 +316,7 @@ private[stream] final class GraphInterpreter(
* true.
*/
def execute(eventLimit: Int): Unit = {
if (GraphInterpreter.Debug) println("---------------- EXECUTE")
var eventsRemaining = eventLimit
var connection = dequeue()
while (eventsRemaining > 0 && connection != NoEvent) {
@ -314,13 +325,14 @@ private[stream] final class GraphInterpreter(
case NonFatal(e)
val stageId = connectionStates(connection) match {
case Failed(ex) throw new IllegalStateException("Double fault. Failure while handling failure.", e)
case Pushable assembly.outOwners(connection)
case Pulled assembly.outOwners(connection)
case Completed assembly.inOwners(connection)
case Cancelled assembly.outOwners(connection)
case PushCompleted(elem) assembly.inOwners(connection)
case pushedElem assembly.inOwners(connection)
}
logics(stageId).failStage(e)
if (stageId == Boundary) throw e
else logics(stageId).failStage(e)
}
eventsRemaining -= 1
if (eventsRemaining > 0) connection = dequeue()
@ -334,47 +346,55 @@ private[stream] final class GraphInterpreter(
def processElement(elem: Any): Unit = {
if (!isStageCompleted(assembly.inOwners(connection))) {
if (GraphInterpreter.Debug) println(s"PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, $elem")
inAvailable(connection) = true
inStates(connection) = Available
inHandlers(connection).onPush()
}
}
connectionStates(connection) match {
case Pushable
case Pulled
if (!isStageCompleted(assembly.outOwners(connection))) {
if (GraphInterpreter.Debug) println(s"PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)}")
outAvailable(connection) = true
outStates(connection) = Available
outHandlers(connection).onPull()
}
case Completed
case Completed | CompletedHasElement(_)
val stageId = assembly.inOwners(connection)
if (!isStageCompleted(stageId)) {
if (!isStageCompleted(stageId) && inStates(connection) != Closed) {
if (GraphInterpreter.Debug) println(s"COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)}")
inAvailable(connection) = false
inStates(connection) = Closed
inHandlers(connection).onUpstreamFinish()
completeConnection(stageId)
}
case Failed(ex)
val stageId = assembly.inOwners(connection)
if (!isStageCompleted(stageId)) {
if (!isStageCompleted(stageId) && inStates(connection) != Closed) {
if (GraphInterpreter.Debug) println(s"FAIL ${outOwnerName(connection)} -> ${inOwnerName(connection)}")
inAvailable(connection) = false
inStates(connection) = Closed
inHandlers(connection).onUpstreamFailure(ex)
completeConnection(stageId)
}
case Cancelled
val stageId = assembly.outOwners(connection)
if (!isStageCompleted(stageId)) {
if (!isStageCompleted(stageId) && outStates(connection) != Closed) {
if (GraphInterpreter.Debug) println(s"CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)}")
outAvailable(connection) = false
outStates(connection) = Closed
outHandlers(connection).onDownstreamFinish()
completeConnection(stageId)
}
case PushCompleted(elem)
inAvailable(connection) = true
connectionStates(connection) = elem
processElement(elem)
enqueue(connection, Completed)
val stageId = assembly.inOwners(connection)
if (!isStageCompleted(stageId) && inStates(connection) != Closed) {
inStates(connection) = Available
connectionStates(connection) = elem
processElement(elem)
val elemAfter = connectionStates(connection)
if (elemAfter == Empty) enqueue(connection, Completed)
else enqueue(connection, CompletedHasElement(elemAfter))
} else {
connectionStates(connection) = Completed
}
case pushedElem processElement(pushedElem)
}
@ -402,14 +422,17 @@ private[stream] final class GraphInterpreter(
// to prevent redundant completion events in case of concurrent invocation on both sides of the connection.
// I.e. when one side already enqueued the completion event, then the other side will not enqueue the event since
// there is noone to process it anymore.
def isConnectionCompleted(connection: Int): Boolean = connectionStates(connection).isInstanceOf[CompletedState]
def isConnectionCompleting(connection: Int): Boolean = connectionStates(connection).isInstanceOf[CompletingState]
// Returns true if the given stage is alredy completed
def isStageCompleted(stageId: Int): Boolean = stageId != Boundary && shutdownCounter(stageId) == 0
private def isPushInFlight(connection: Int): Boolean =
!inAvailable(connection) &&
!connectionStates(connection).isInstanceOf[ConnectionState] &&
(inStates(connection) == InFlight) && // Other side has not been notified
hasElement(connection)
private def hasElement(connection: Int): Boolean =
!connectionStates(connection).isInstanceOf[ConnectionState] &&
connectionStates(connection) != Empty
// Register that a connection in which the given stage participated has been completed and therefore the stage
@ -430,37 +453,46 @@ private[stream] final class GraphInterpreter(
}
private[stream] def push(connection: Int, elem: Any): Unit = {
outAvailable(connection) = false
enqueue(connection, elem)
if (!(inStates(connection) eq Closed)) {
outStates(connection) = InFlight
enqueue(connection, elem)
}
}
private[stream] def pull(connection: Int): Unit = {
inAvailable(connection) = false
enqueue(connection, Pushable)
if (!(outStates(connection) eq Closed)) {
inStates(connection) = InFlight
enqueue(connection, Pulled)
}
}
private[stream] def complete(connection: Int): Unit = {
outAvailable(connection) = false
if (!isConnectionCompleted(connection)) {
// There is a pending push, we change the signal to be a PushCompleted (there can be only one signal in flight
// for a connection)
if (isPushInFlight(connection))
connectionStates(connection) = PushCompleted(connectionStates(connection))
else
enqueue(connection, Completed)
outStates(connection) = Closed
if (!isConnectionCompleting(connection) && (inStates(connection) ne Closed)) {
if (hasElement(connection)) {
// There is a pending push, we change the signal to be a PushCompleted (there can be only one signal in flight
// for a connection)
if (inStates(connection) == InFlight)
connectionStates(connection) = PushCompleted(connectionStates(connection))
else enqueue(connection, CompletedHasElement(connectionStates(connection)))
} else enqueue(connection, Completed)
}
completeConnection(assembly.outOwners(connection))
}
private[stream] def fail(connection: Int, ex: Throwable): Unit = {
outAvailable(connection) = false
if (!isConnectionCompleted(connection)) enqueue(connection, Failed(ex))
outStates(connection) = Closed
if (!isConnectionCompleting(connection) && (inStates(connection) ne Closed))
enqueue(connection, Failed(ex))
completeConnection(assembly.outOwners(connection))
}
private[stream] def cancel(connection: Int): Unit = {
inAvailable(connection) = false
if (!isConnectionCompleted(connection)) enqueue(connection, Cancelled)
inStates(connection) = Closed
if (!isConnectionCompleting(connection) && (outStates(connection) ne Closed))
enqueue(connection, Cancelled)
completeConnection(assembly.inOwners(connection))
}

View file

@ -154,11 +154,12 @@ abstract class GraphStageLogic {
*/
final protected def pull[T](in: Inlet[T]): Unit = {
require(!hasBeenPulled(in), "Cannot pull port twice")
require(!isClosed(in), "Cannot pull closed port")
interpreter.pull(conn(in))
}
/**
* Requests to stop receiving events from a given input port.
* Requests to stop receiving events from a given input port. Cancelling clears any ungrabbed elements from the port.
*/
final protected def cancel[T](in: Inlet[T]): Unit = interpreter.cancel(conn(in))
@ -173,15 +174,22 @@ abstract class GraphStageLogic {
require(isAvailable(in), "Cannot get element from already empty input port")
val connection = conn(in)
val elem = interpreter.connectionStates(connection)
interpreter.connectionStates(connection) = Empty
elem.asInstanceOf[T]
elem match {
case CompletedHasElement(realElem)
interpreter.connectionStates(connection) = Completed
realElem.asInstanceOf[T]
case _
interpreter.connectionStates(connection) = Empty
elem.asInstanceOf[T]
}
}
/**
* Indicates whether there is already a pending pull for the given input port. If this method returns true
* then [[isAvailable()]] must return false for that same port.
*/
final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = !interpreter.inAvailable(conn(in))
final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = interpreter.inStates(conn(in)) eq InFlight
/**
* Indicates whether there is an element waiting at the given input port. [[grab()]] can be used to retrieve the
@ -191,9 +199,20 @@ abstract class GraphStageLogic {
*/
final protected def isAvailable[T](in: Inlet[T]): Boolean = {
val connection = conn(in)
interpreter.inAvailable(connection) && !(interpreter.connectionStates(connection) == Empty)
val state = interpreter.connectionStates(connection)
val arrived = interpreter.inStates(connection) ne InFlight
val hasElementState = state.isInstanceOf[HasElementState]
val rawElement = (state != Empty) && !state.isInstanceOf[ConnectionState]
arrived && (hasElementState || rawElement)
}
/**
* Indicates whether the port has been closed. A closed port cannot be pulled.
*/
final protected def isClosed[T](in: Inlet[T]): Boolean = interpreter.inStates(conn(in)) eq Closed
/**
* Emits an element through the given output port. Calling this method twice before a [[pull()]] has been arrived
* will fail. There can be only one outstanding push request at any given time. The method [[isAvailable()]] can be
@ -201,6 +220,7 @@ abstract class GraphStageLogic {
*/
final protected def push[T](out: Outlet[T], elem: T): Unit = {
require(isAvailable(out), "Cannot push port twice")
require(!isClosed(out), "Cannot pull closed port")
interpreter.push(conn(out), elem)
}
@ -235,7 +255,12 @@ abstract class GraphStageLogic {
/**
* Return true if the given output port is ready to be pushed.
*/
final def isAvailable[T](out: Outlet[T]): Boolean = interpreter.outAvailable(conn(out))
final def isAvailable[T](out: Outlet[T]): Boolean = interpreter.outStates(conn(out)) eq Available
/**
* Indicates whether the port has been closed. A closed port cannot be pushed.
*/
final protected def isClosed[T](out: Outlet[T]): Boolean = interpreter.outStates(conn(out)) eq Closed
/**
* Obtain a callback object that can be used asynchronously to re-enter the