=str: Optmizing GraphInterpreter

This commit is contained in:
Endre Sándor Varga 2015-09-25 16:36:53 +02:00
parent e7a14e53a9
commit f4b614a186
19 changed files with 996 additions and 273 deletions

View file

@ -8,10 +8,11 @@ object BenchRunner {
def main(args: Array[String]) = {
import scala.collection.JavaConversions._
val args2 = args.toList match {
case "quick" :: tail => "-i 1 -wi 1 -f1 -t1".split(" ").toList ::: tail
case "full" :: tail => "-i 10 -wi 4 -f3 -t1".split(" ").toList ::: tail
case other => other
val args2 = args.toList.flatMap {
case "quick" => "-i 1 -wi 1 -f1 -t1".split(" ").toList
case "full" => "-i 10 -wi 4 -f3 -t1".split(" ").toList
case "jitwatch" => "-jvmArgs=-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation" :: Nil
case other => other :: Nil
}
val opts = new CommandLineOptions(args2: _*)

View file

@ -12,6 +12,7 @@ import org.openjdk.jmh.annotations._
import scala.concurrent.Lock
import scala.util.Success
import akka.stream.impl.fusing.GraphStages
import org.reactivestreams._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@ -25,6 +26,15 @@ class FlowMapBenchmark {
log-dead-letters-during-shutdown = off
loglevel = "WARNING"
actor.default-dispatcher {
#executor = "thread-pool-executor"
throughput = 1024
}
actor.default-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
test {
timefactor = 1.0
filter-leeway = 3s
@ -40,10 +50,8 @@ class FlowMapBenchmark {
var materializer: ActorMaterializer = _
final val UseGraphStageIdentity = false
// manual, and not via @Param, because we want @OperationsPerInvocation on our tests
final val data100k = (1 to 100000).toVector
@Param(Array("true", "false"))
val UseGraphStageIdentity = false
final val successMarker = Success(1)
final val successFailure = Success(new Exception)
@ -51,7 +59,7 @@ class FlowMapBenchmark {
// safe to be benchmark scoped because the flows we construct in this bench are stateless
var flow: Source[Int, Unit] = _
@Param(Array("2", "8")) // todo
@Param(Array("8", "32", "128"))
val initialInputBufferSize = 0
@Param(Array("1", "5", "10"))
@ -60,11 +68,38 @@ class FlowMapBenchmark {
@Setup
def setup() {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialInputBufferSize, 16)
.withInputBuffer(initialInputBufferSize, initialInputBufferSize)
materializer = ActorMaterializer(settings)
flow = mkMaps(Source(data100k), numberOfMapOps) {
// Important to use a synchronous, zero overhead source, otherwise the slowness of the source
// might bias the benchmark, since the stream always adjusts the rate to the slowest stage.
val syncTestPublisher = new Publisher[Int] {
override def subscribe(s: Subscriber[_ >: Int]): Unit = {
val sub = new Subscription {
var counter = 0 // Piggyback on caller thread, no need for volatile
override def request(n: Long): Unit = {
var i = n
while (i > 0) {
s.onNext(counter)
counter += 1
if (counter == 100000) {
s.onComplete()
return
}
i -= 1
}
}
override def cancel(): Unit = ()
}
s.onSubscribe(sub)
}
}
flow = mkMaps(Source(syncTestPublisher), numberOfMapOps) {
if (UseGraphStageIdentity)
new GraphStages.Identity[Int]
else

View file

@ -51,9 +51,13 @@ class InterpreterBenchmark {
@Benchmark
@OperationsPerInvocation(100000)
def onebounded_interpreter_100k_elements() {
val lock = new Lock()
lock.acquire()
val sink = OneBoundedDataSink(data100k.size)
// FIXME: This should not be here, this is pure setup overhead
val ops = Vector.fill(numberOfIds)(MapStage(identity[Int], Supervision.stoppingDecider))
val ops = Vector.fill(numberOfIds)(new PushPullStage[Int, Int] {
override def onPull(ctx: _root_.akka.stream.stage.Context[Int]) = ctx.pull()
override def onPush(elem: Int, ctx: _root_.akka.stream.stage.Context[Int]) = ctx.push(elem)
})
val interpreter = new OneBoundedInterpreter(OneBoundedDataSource(data100k) +: ops :+ sink,
(op, ctx, event) (),
Logging(NoopBus, classOf[InterpreterBenchmark]),
@ -70,6 +74,7 @@ object InterpreterBenchmark {
case class GraphDataSource[T](override val toString: String, data: Vector[T]) extends UpstreamBoundaryStageLogic[T] {
var idx = 0
val out = Outlet[T]("out")
out.id = 0
setHandler(out, new OutHandler {
override def onPull(): Unit = {
@ -87,6 +92,7 @@ object InterpreterBenchmark {
case class GraphDataSink[T](override val toString: String, var expected: Int) extends DownstreamBoundaryStageLogic[T] {
val in = Inlet[T]("in")
in.id = 0
setHandler(in, new InHandler {
override def onPush(): Unit = {

View file

@ -138,7 +138,7 @@ package util {
val out = Outlet[HttpEntity.Strict]("out")
override val shape = FlowShape(in, out)
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
var bytes = ByteString.newBuilder
private var emptyStream = false

View file

@ -47,7 +47,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec {
val out2 = Outlet[Int]("out2")
val shape = BidiShape(in1, out1, in2, out2)
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in1, new InHandler {
override def onPush(): Unit = push(out1, grab(in1))
override def onUpstreamFinish(): Unit = complete(out1)
@ -88,7 +88,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec {
val out2 = Outlet[Int]("out2")
val shape = BidiShape(in1, out1, in2, out2)
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in1, new InHandler {
override def onPush(): Unit = push(out1, grab(in1))
@ -134,7 +134,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec {
val out2 = Outlet[Int]("out2")
val shape = BidiShape(in1, out1, in2, out2)
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in1, new InHandler {
override def onPush(): Unit = push(out1, grab(in1))
@ -183,7 +183,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec {
val out2 = Outlet[Int]("out2")
val shape = BidiShape(in1, out1, in2, out2)
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in1, new InHandler {
override def onPush(): Unit = push(out2, grab(in1))

View file

@ -0,0 +1,110 @@
package akka.stream.impl.fusing
import akka.stream.testkit.Utils.TE
import akka.testkit.EventFilter
class GraphInterpreterFailureModesSpec extends GraphInterpreterSpecKit {
"GraphInterpreter" must {
"handle failure on onPull" in new FailingStageSetup {
lastEvents() should be(Set(PreStart(stage)))
downstream.pull()
failOnNextEvent()
stepAll()
lastEvents() should be(Set(Cancel(upstream), OnError(downstream, testException), PostStop(stage)))
}
"handle failure on onPush" in new FailingStageSetup {
lastEvents() should be(Set(PreStart(stage)))
downstream.pull()
stepAll()
clearEvents()
upstream.push(0)
failOnNextEvent()
stepAll()
lastEvents() should be(Set(Cancel(upstream), OnError(downstream, testException), PostStop(stage)))
}
"handle failure on onPull while cancel is pending" in new FailingStageSetup {
lastEvents() should be(Set(PreStart(stage)))
downstream.pull()
downstream.cancel()
failOnNextEvent()
stepAll()
lastEvents() should be(Set(Cancel(upstream), PostStop(stage)))
}
"handle failure on onPush while complete is pending" in new FailingStageSetup {
lastEvents() should be(Set(PreStart(stage)))
downstream.pull()
stepAll()
clearEvents()
upstream.push(0)
upstream.complete()
failOnNextEvent()
stepAll()
lastEvents() should be(Set(OnError(downstream, testException), PostStop(stage)))
}
"handle failure on onUpstreamFinish" in new FailingStageSetup {
lastEvents() should be(Set(PreStart(stage)))
upstream.complete()
failOnNextEvent()
stepAll()
lastEvents() should be(Set(OnError(downstream, testException), PostStop(stage)))
}
"handle failure on onUpstreamFailure" in new FailingStageSetup {
lastEvents() should be(Set(PreStart(stage)))
upstream.fail(TE("another exception")) // this is not the exception that will be propagated
failOnNextEvent()
stepAll()
lastEvents() should be(Set(OnError(downstream, testException), PostStop(stage)))
}
"handle failure on onDownstreamFinish" in new FailingStageSetup {
lastEvents() should be(Set(PreStart(stage)))
downstream.cancel()
failOnNextEvent()
stepAll()
lastEvents() should be(Set(Cancel(upstream), PostStop(stage)))
}
"handle failure in preStart" in new FailingStageSetup(initFailOnNextEvent = true) {
stepAll()
lastEvents() should be(Set(Cancel(upstream), OnError(downstream, testException), PostStop(stage)))
}
"handle failure in postStop" in new FailingStageSetup {
lastEvents() should be(Set(PreStart(stage)))
upstream.complete()
downstream.cancel()
failOnPostStop()
EventFilter.error("Error during postStop in [stage]").intercept {
stepAll()
lastEvents() should be(Set.empty)
}
}
}
}

View file

@ -4,11 +4,14 @@
package akka.stream.impl.fusing
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.Utils._
class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit {
class GraphInterpreterPortsSpec extends GraphInterpreterSpecKit {
"Port states" must {
// FIXME test failure scenarios
"properly transition on push and pull" in new PortTestSetup {
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
@ -352,6 +355,32 @@ class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit {
in.grab() should ===(0)
}
"propagate complete while push is in flight and pulled after the push" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
out.complete()
step()
lastEvents() should be(Set(OnNext(in, 0)))
in.grab() should ===(0)
in.pull()
stepAll()
lastEvents() should be(Set(OnComplete(in)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"ignore pull while completing" in new PortTestSetup {
out.complete()
in.pull()
@ -754,6 +783,392 @@ class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit {
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"not allow to grab element before it arrives" in new PortTestSetup {
in.pull()
stepAll()
out.push(0)
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"not allow to grab element if already cancelled" in new PortTestSetup {
in.pull()
stepAll()
out.push(0)
in.cancel()
stepAll()
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate failure while downstream is active" in new PortTestSetup {
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
out.fail(TE("test"))
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
stepAll()
lastEvents() should be(Set(OnError(in, TE("test"))))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.cancel() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate failure while upstream is active" in new PortTestSetup {
in.pull()
stepAll()
lastEvents() should be(Set(RequestOne(out)))
out.isAvailable should be(true)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
out.fail(TE("test"))
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
stepAll()
lastEvents() should be(Set(OnError(in, TE("test"))))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.cancel() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate failure while pull is in flight" in new PortTestSetup {
in.pull()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
out.fail(TE("test"))
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
stepAll()
lastEvents() should be(Set(OnError(in, TE("test"))))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
in.cancel() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate failure while push is in flight" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(false)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
out.fail(TE("test"))
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(true)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
step()
lastEvents() should be(Set(OnNext(in, 0)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(true)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
in.grab() should ===(0)
an[IllegalArgumentException] should be thrownBy { in.grab() }
step()
lastEvents() should be(Set(OnError(in, TE("test"))))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
out.complete() // This should have no effect now
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"propagate failure while push is in flight and keep ungrabbed element" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
out.fail(TE("test"))
step()
lastEvents() should be(Set(OnNext(in, 0)))
step()
lastEvents() should be(Set(OnError(in, TE("test"))))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(true)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
in.grab() should ===(0)
}
"ignore pull while failing" in new PortTestSetup {
out.fail(TE("test"))
in.pull()
stepAll()
lastEvents() should be(Set(OnError(in, TE("test"))))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"ignore any failure completion if they are concurrent (cancel first)" in new PortTestSetup {
in.cancel()
out.fail(TE("test"))
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"ignore any failure completion if they are concurrent (complete first)" in new PortTestSetup {
out.fail(TE("test"))
in.cancel()
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"ignore failure from a push-then-fail if cancelled while in flight" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
out.fail(TE("test"))
in.cancel()
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
"ignore failure from a push-then-fail if cancelled after onPush" in new PortTestSetup {
in.pull()
stepAll()
clearEvents()
out.push(0)
out.fail(TE("test"))
step()
lastEvents() should be(Set(OnNext(in, 0)))
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(true)
in.hasBeenPulled should be(false)
in.isClosed should be(false)
an[IllegalArgumentException] should be thrownBy { out.push(0) }
in.grab() should ===(0)
in.cancel()
stepAll()
lastEvents() should be(Set.empty)
out.isAvailable should be(false)
out.isClosed should be(true)
in.isAvailable should be(false)
in.hasBeenPulled should be(false)
in.isClosed should be(true)
an[IllegalArgumentException] should be thrownBy { in.pull() }
an[IllegalArgumentException] should be thrownBy { out.push(0) }
an[IllegalArgumentException] should be thrownBy { in.grab() }
}
}
}

View file

@ -7,7 +7,7 @@ import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl.{ Merge, Broadcast, Balance, Zip }
import GraphInterpreter._
class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
class GraphInterpreterSpec extends GraphInterpreterSpecKit {
import GraphStages._
"GraphInterpreter" must {
@ -273,8 +273,6 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
lastEvents() should ===(Set(OnNext(sink2, 2)))
}
"implement bidi-stage" in pending
"implement non-divergent cycle" in new TestSetup {
val source = new UpstreamProbe[Int]("source")
val sink = new DownstreamProbe[Int]("sink")

View file

@ -3,11 +3,14 @@
*/
package akka.stream.impl.fusing
import akka.stream.{ NoMaterializer, Outlet, Inlet, Shape }
import akka.stream.impl.fusing.GraphInterpreter.{ GraphAssembly, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
import akka.event.Logging
import akka.stream._
import akka.stream.impl.fusing.GraphInterpreter.{ Failed, GraphAssembly, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
import akka.stream.stage.{ InHandler, OutHandler, GraphStage, GraphStageLogic }
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.Utils.TE
trait GraphInterpreterSpecKit {
trait GraphInterpreterSpecKit extends AkkaSpec {
sealed trait TestEvent {
def source: GraphStageLogic
@ -20,11 +23,17 @@ trait GraphInterpreterSpecKit {
case class RequestOne(source: GraphStageLogic) extends TestEvent
case class RequestAnother(source: GraphStageLogic) extends TestEvent
case class PreStart(source: GraphStageLogic) extends TestEvent
case class PostStop(source: GraphStageLogic) extends TestEvent
abstract class TestSetup {
protected var lastEvent: Set[TestEvent] = Set.empty
private var _interpreter: GraphInterpreter = _
protected def interpreter: GraphInterpreter = _interpreter
def stepAll(): Unit = interpreter.execute(eventLimit = Int.MaxValue)
def step(): Unit = interpreter.execute(eventLimit = 1)
class AssemblyBuilder(stages: Seq[GraphStage[_ <: Shape]]) {
var upstreams = Vector.empty[(UpstreamBoundaryStageLogic[_], Inlet[_])]
var downstreams = Vector.empty[(Outlet[_], DownstreamBoundaryStageLogic[_])]
@ -59,7 +68,7 @@ trait GraphInterpreterSpecKit {
(Vector.fill(upstreams.size)(-1) ++ outOwners).toArray)
val (inHandlers, outHandlers, logics, _) = assembly.materialize()
_interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ())
_interpreter = new GraphInterpreter(assembly, NoMaterializer, Logging(system, classOf[TestSetup]), inHandlers, outHandlers, logics, (_, _, _) ())
for ((upstream, i) upstreams.zipWithIndex) {
_interpreter.attachUpstreamBoundary(i, upstream._1)
@ -75,7 +84,7 @@ trait GraphInterpreterSpecKit {
def manualInit(assembly: GraphAssembly): Unit = {
val (inHandlers, outHandlers, logics, _) = assembly.materialize()
_interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ())
_interpreter = new GraphInterpreter(assembly, NoMaterializer, Logging(system, classOf[TestSetup]), inHandlers, outHandlers, logics, (_, _, _) ())
}
def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages.toSeq)
@ -90,10 +99,11 @@ trait GraphInterpreterSpecKit {
class UpstreamProbe[T](override val toString: String) extends UpstreamBoundaryStageLogic[T] {
val out = Outlet[T]("out")
out.id = 0
setHandler(out, new OutHandler {
override def onPull(): Unit = lastEvent += RequestOne(UpstreamProbe.this)
override def onDownstreamFinish() = lastEvent += Cancel(UpstreamProbe.this)
override def onDownstreamFinish(): Unit = lastEvent += Cancel(UpstreamProbe.this)
})
def onNext(elem: T, eventLimit: Int = Int.MaxValue): Unit = {
@ -105,11 +115,12 @@ trait GraphInterpreterSpecKit {
class DownstreamProbe[T](override val toString: String) extends DownstreamBoundaryStageLogic[T] {
val in = Inlet[T]("in")
in.id = 0
setHandler(in, new InHandler {
override def onPush(): Unit = lastEvent += OnNext(DownstreamProbe.this, grab(in))
override def onUpstreamFinish() = lastEvent += OnComplete(DownstreamProbe.this)
override def onUpstreamFailure(ex: Throwable) = OnError(DownstreamProbe.this, ex)
override def onUpstreamFinish(): Unit = lastEvent += OnComplete(DownstreamProbe.this)
override def onUpstreamFailure(ex: Throwable): Unit = lastEvent += OnError(DownstreamProbe.this, ex)
})
def requestOne(eventLimit: Int = Int.MaxValue): Unit = {
@ -146,19 +157,19 @@ trait GraphInterpreterSpecKit {
setHandler(in, new InHandler {
// Modified onPush that does not grab() automatically the element. This accesses some internals.
override def onPush(): Unit =
lastEvent +=
OnNext(
DownstreamPortProbe.this,
interpreter.connectionStates(inToConn(in)))
override def onPush(): Unit = {
val internalEvent = interpreter.connectionSlots(inToConn(in.id))
override def onUpstreamFinish() = lastEvent += OnComplete(DownstreamPortProbe.this)
override def onUpstreamFailure(ex: Throwable) = OnError(DownstreamPortProbe.this, ex)
})
internalEvent match {
case Failed(_, elem) lastEvent += OnNext(DownstreamPortProbe.this, elem)
case elem lastEvent += OnNext(DownstreamPortProbe.this, elem)
}
}
def stepAll(): Unit = interpreter.execute(eventLimit = Int.MaxValue)
def step(): Unit = interpreter.execute(eventLimit = 1)
override def onUpstreamFinish() = lastEvent += OnComplete(DownstreamPortProbe.this)
override def onUpstreamFailure(ex: Throwable) = lastEvent += OnError(DownstreamPortProbe.this, ex)
})
}
private val assembly = GraphAssembly(
stages = Array.empty,
@ -173,4 +184,73 @@ trait GraphInterpreterSpecKit {
interpreter.init()
}
abstract class FailingStageSetup(initFailOnNextEvent: Boolean = false) extends TestSetup {
val upstream = new UpstreamPortProbe[Int]
val downstream = new DownstreamPortProbe[Int]
private var _failOnNextEvent: Boolean = initFailOnNextEvent
private var _failOnPostStop: Boolean = false
def failOnNextEvent(): Unit = _failOnNextEvent = true
def failOnPostStop(): Unit = _failOnPostStop = true
def testException = TE("test")
private val stagein = Inlet[Int]("sandwitch.in")
private val stageout = Outlet[Int]("sandwitch.out")
private val stageshape = FlowShape(stagein, stageout)
// Must be lazy because I turned this stage "inside-out" therefore changing initialization order
// to make tests a bit more readable
lazy val stage: GraphStageLogic = new GraphStageLogic(stageshape) {
private def mayFail(task: Unit): Unit = {
if (!_failOnNextEvent) task
else {
_failOnNextEvent = false
throw testException
}
}
setHandler(stagein, new InHandler {
override def onPush(): Unit = mayFail(push(stageout, grab(stagein)))
override def onUpstreamFinish(): Unit = mayFail(completeStage())
override def onUpstreamFailure(ex: Throwable): Unit = mayFail(failStage(ex))
})
setHandler(stageout, new OutHandler {
override def onPull(): Unit = mayFail(pull(stagein))
override def onDownstreamFinish(): Unit = mayFail(completeStage())
})
override def preStart(): Unit = mayFail(lastEvent += PreStart(stage))
override def postStop(): Unit =
if (!_failOnPostStop) lastEvent += PostStop(stage)
else throw testException
override def toString = "stage"
}
private val sandwitchStage = new GraphStage[FlowShape[Int, Int]] {
override def shape = stageshape
override def createLogic: GraphStageLogic = stage
}
class UpstreamPortProbe[T] extends UpstreamProbe[T]("upstreamPort") {
def push(elem: T): Unit = push(out, elem)
def complete(): Unit = complete(out)
def fail(ex: Throwable): Unit = fail(out, ex)
}
class DownstreamPortProbe[T] extends DownstreamProbe[T]("upstreamPort") {
def pull(): Unit = pull(in)
def cancel(): Unit = cancel(in)
}
builder(sandwitchStage)
.connect(upstream, stagein)
.connect(stageout, downstream)
.init()
}
}

View file

@ -49,7 +49,7 @@ class UnzipWith1[In, [#A1#]](unzipper: In ⇒ ([#A1#])) extends GraphStage[FanOu
[#def out0: Outlet[A1] = shape.out0#
]
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
var pendingCount = 1
var downstreamRunning = 1

View file

@ -30,7 +30,7 @@ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape
[#val in0: Inlet[A1] = shape.in0#
]
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
var pending = 1
private def pushAll(): Unit = push(out, zipper([#grab(in0)#]))

View file

@ -15,8 +15,13 @@ import scala.collection.JavaConverters._
* for otherwise unreasonable existential types.
*/
sealed abstract class InPort { self: Inlet[_]
final override def hashCode: Int = System.identityHashCode(this)
final override def hashCode: Int = super.hashCode
final override def equals(that: Any): Boolean = this eq that.asInstanceOf[AnyRef]
/**
* INTERNAL API
*/
private[stream] var id: Int = -1
}
/**
* An output port of a StreamLayout.Module. This type logically belongs
@ -25,8 +30,13 @@ sealed abstract class InPort { self: Inlet[_] ⇒
* for otherwise unreasonable existential types.
*/
sealed abstract class OutPort { self: Outlet[_]
final override def hashCode: Int = System.identityHashCode(this)
final override def hashCode: Int = super.hashCode
final override def equals(that: Any): Boolean = this eq that.asInstanceOf[AnyRef]
/**
* INTERNAL API
*/
private[stream] var id: Int = -1
}
/**

View file

@ -100,6 +100,7 @@ private[stream] object ActorGraphInterpreter {
private var batchRemaining = requestBatchSize
val out: Outlet[Any] = Outlet[Any]("UpstreamBoundary" + id)
out.id = 0
private def dequeue(): Any = {
val elem = inputBuffer(nextInputElementCursor)
@ -195,6 +196,7 @@ private[stream] object ActorGraphInterpreter {
class ActorOutputBoundary(actor: ActorRef, id: Int) extends DownstreamBoundaryStageLogic[Any] {
val in: Inlet[Any] = Inlet[Any]("UpstreamBoundary" + id)
in.id = 0
private var exposedPublisher: ActorPublisher[Any] = _
@ -299,6 +301,7 @@ private[stream] class ActorGraphInterpreter(
val interpreter = new GraphInterpreter(
assembly,
mat,
Logging(this),
inHandlers,
outHandlers,
logics,

View file

@ -3,6 +3,8 @@
*/
package akka.stream.impl.fusing
import akka.event.LoggingAdapter
import akka.io.Tcp.Closed
import akka.stream.stage._
import akka.stream.{ Materializer, Shape, Inlet, Outlet }
@ -19,37 +21,35 @@ private[stream] object GraphInterpreter {
*/
final val Debug = false
final val NoEvent = -1
final val Boundary = -1
final val InReady = 1
final val Pulling = 2
final val Pushing = 4
final val OutReady = 8
final val InClosed = 16
final val OutClosed = 32
final val InFailed = 64
final val PullStartFlip = 3 // 0011
final val PullEndFlip = 10 // 1010
final val PushStartFlip = 12 //1100
final val PushEndFlip = 5 //0101
/**
* Marker object that indicates that a port holds no element since it was already grabbed. The port is still pullable,
* but there is no more element to grab.
*/
case object Empty
final case class Failed(ex: Throwable, previousElem: Any)
sealed trait ConnectionState
case object Pulled extends ConnectionState
sealed trait HasElementState
sealed trait CompletingState extends ConnectionState
final case class CompletedHasElement(element: Any) extends CompletingState with HasElementState
final case class PushCompleted(element: Any) extends CompletingState with HasElementState
case object Completed extends CompletingState
case object Cancelled extends CompletingState
final case class Failed(ex: Throwable) extends CompletingState
val NoEvent = -1
val Boundary = -1
sealed trait PortState
case object InFlight extends PortState
case object Available extends PortState
case object Closed extends PortState
abstract class UpstreamBoundaryStageLogic[T] extends GraphStageLogic {
abstract class UpstreamBoundaryStageLogic[T] extends GraphStageLogic(inCount = 0, outCount = 1) {
def out: Outlet[T]
}
abstract class DownstreamBoundaryStageLogic[T] extends GraphStageLogic {
abstract class DownstreamBoundaryStageLogic[T] extends GraphStageLogic(inCount = 1, outCount = 0) {
def in: Inlet[T]
}
@ -107,27 +107,52 @@ private[stream] object GraphInterpreter {
val logics = Array.ofDim[GraphStageLogic](stages.length)
var finalMat: Any = ()
for (i stages.indices) {
// FIXME: Support for materialized values in fused islands is not yet figured out!
val (logic, mat) = stages(i).createLogicAndMaterializedValue
// FIXME: Current temporary hack to support non-fused stages. If there is one stage that will be under index 0.
if (i == 0) finalMat = mat
var i = 0
while (i < stages.length) {
// Port initialization loops, these must come first
val shape = stages(i).asInstanceOf[GraphStageWithMaterializedValue[Shape, _]].shape
logics(i) = logic
var idx = 0
val inletItr = shape.inlets.iterator
while (inletItr.hasNext) {
val inlet = inletItr.next()
require(inlet.id == -1 || inlet.id == idx, s"Inlet $inlet was shared among multiple stages. This is illegal.")
inlet.id = idx
idx += 1
}
idx = 0
val outletItr = shape.outlets.iterator
while (outletItr.hasNext) {
val outlet = outletItr.next()
require(outlet.id == -1 || outlet.id == idx, s"Outlet $outlet was shared among multiple stages. This is illegal.")
outlet.id = idx
idx += 1
}
// FIXME: Support for materialized values in fused islands is not yet figured out!
val logicAndMat = stages(i).createLogicAndMaterializedValue
// FIXME: Current temporary hack to support non-fused stages. If there is one stage that will be under index 0.
if (i == 0) finalMat = logicAndMat._2
logics(i) = logicAndMat._1
i += 1
}
val inHandlers = Array.ofDim[InHandler](connectionCount)
val outHandlers = Array.ofDim[OutHandler](connectionCount)
for (i 0 until connectionCount) {
i = 0
while (i < connectionCount) {
if (ins(i) ne null) {
inHandlers(i) = logics(inOwners(i)).inHandlers(ins(i))
logics(inOwners(i)).inToConn += ins(i) -> i
inHandlers(i) = logics(inOwners(i)).inHandlers(ins(i).id)
logics(inOwners(i)).inToConn(ins(i).id) = i
}
if (outs(i) ne null) {
outHandlers(i) = logics(outOwners(i)).outHandlers(outs(i))
logics(outOwners(i)).outToConn += outs(i) -> i
outHandlers(i) = logics(outOwners(i)).outHandlers(outs(i).id)
logics(outOwners(i)).outToConn(outs(i).id) = i
}
i += 1
}
(inHandlers, outHandlers, logics, finalMat)
@ -168,12 +193,10 @@ private[stream] object GraphInterpreter {
* connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair),
* while in the practical sense a connection is a number which represents slots in certain arrays.
* In particular
* - connectionStates is a mapping from a connection id to a current (or future) state of the connection
* - inStates is a mapping from a connection to a [[akka.stream.impl.fusing.GraphInterpreter.PortState]]
* that indicates whether the input corresponding
* to the connection is currently pullable or completed
* - outStates is a mapping from a connection to a [[akka.stream.impl.fusing.GraphInterpreter.PortState]]
* that indicates whether the input corresponding to the connection is currently pushable or completed
* - portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this
* connection. This bitfield is used to decode the event that is in-flight.
* - connectionSlots is a mapping from a connection id to a potential element or exception that accompanies the
* event encoded in the portStates bitfield
* - inHandlers is a mapping from a connection id to the [[InHandler]] instance that handles the events corresponding
* to the input port of the connection
* - outHandlers is a mapping from a connection id to the [[OutHandler]] instance that handles the events corresponding
@ -181,20 +204,39 @@ private[stream] object GraphInterpreter {
*
* On top of these lookup tables there is an eventQueue, represented as a circular buffer of integers. The integers
* it contains represents connections that have pending events to be processed. The pending event itself is encoded
* in the connectionStates table. This implies that there can be only one event in flight for a given connection, which
* is true in almost all cases, except a complete-after-push which is therefore handled with a special event
* [[GraphInterpreter#PushCompleted]].
* in the portStates bitfield. This implies that there can be only one event in flight for a given connection, which
* is true in almost all cases, except a complete-after-push or fail-after-push.
*
* The layout of the portStates bitfield is the following:
*
* |- state machn.-| Only one bit is hot among these bits
* 64 32 16 | 8 4 2 1 |
* +---+---+---|---+---+---+---|
* | | | | | | |
* | | | | | | | From the following flags only one is active in any given time. These bits encode
* | | | | | | | state machine states, and they are "moved" around using XOR masks to keep other bits
* | | | | | | | intact.
* | | | | | | |
* | | | | | | +- InReady: The input port is ready to be pulled
* | | | | | +----- Pulling: A pull is active, but have not arrived yet (queued)
* | | | | +--------- Pushing: A push is active, but have not arrived yet (queued)
* | | | +------------- OutReady: The output port is ready to be pushed
* | | |
* | | +----------------- InClosed: The input port is closed and will not receive any events.
* | | A push might be still in flight which will be then processed first.
* | +--------------------- OutClosed: The output port is closed and will not receive any events.
* +------------------------- InFailed: Always set in conjunction with InClosed. Indicates that the close event
* is a failure
*
* Sending an event is usually the following sequence:
* - An action is requested by a stage logic (push, pull, complete, etc.)
* - the availability of the port is set on the sender side to Limbo (inStates or outStates)
* - the scheduled event is put in the slot of the connection in the connectionStates table
* - the state machine in portStates is transitioned from a ready state to a pending event
* - the id of the affected connection is enqueued
*
* Receiving an event is usually the following sequence:
* - id of connection to be processed is dequeued
* - the type of the event is determined by the object in the corresponding connectionStates slot
* - the availability of the port is set on the receiver side to be Available (inStates or outStates)
* - the type of the event is determined from the bits set on portStates
* - the state machine in portStates is transitioned to a ready state
* - using the inHandlers/outHandlers table the corresponding callback is called on the stage logic.
*
* Because of the FIFO construction of the queue the interpreter is fair, i.e. a pending event is always executed
@ -205,52 +247,48 @@ private[stream] object GraphInterpreter {
private[stream] final class GraphInterpreter(
private val assembly: GraphInterpreter.GraphAssembly,
val materializer: Materializer,
val log: LoggingAdapter,
val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection
val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection
val logics: Array[GraphStageLogic], // Array of stage logics
val onAsyncInput: (GraphStageLogic, Any, (Any) Unit) Unit) {
import GraphInterpreter._
// Maintains the next event (and state) of the connection.
// Technically the connection cannot be considered being in the state that is encoded here before the enqueued
// connection event has been processed. The inStates and outStates arrays usually protect access to this
// field while it is in transient state.
val connectionStates = Array.fill[Any](assembly.connectionCount)(Empty)
// Maintains additional information for events, basically elements in-flight, or failure.
// Other events are encoded in the portStates bitfield.
val connectionSlots = Array.fill[Any](assembly.connectionCount)(Empty)
// Indicates whether the input port is pullable. After pulling it becomes false
// Be aware that when inAvailable goes to false outAvailable does not become true immediately, only after
// the corresponding event in the queue has been processed
val inStates = Array.fill[PortState](assembly.connectionCount)(Available)
// Bitfield encoding pending events and various states for efficient querying and updates. See the documentation
// of the class for a full description.
val portStates = Array.fill[Int](assembly.connectionCount)(InReady)
// Indicates whether the output port is pushable. After pushing it becomes false
// Be aware that when inAvailable goes to false outAvailable does not become true immediately, only after
// the corresponding event in the queue has been processed
val outStates = Array.fill[PortState](assembly.connectionCount)(InFlight)
private[this] var activeStageId = Boundary
// The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be
// completed
private var runningStages = assembly.stages.length
private[this] var runningStages = assembly.stages.length
// Counts how many active connections a stage has. Once it reaches zero, the stage is automatically stopped.
private val shutdownCounter = Array.tabulate(assembly.stages.length) { i
private[this] val shutdownCounter = Array.tabulate(assembly.stages.length) { i
val shape = assembly.stages(i).shape.asInstanceOf[Shape]
shape.inlets.size + shape.outlets.size
}
// An event queue implemented as a circular buffer
private val eventQueue = Array.ofDim[Int](256)
private val mask = eventQueue.length - 1
private var queueHead: Int = 0
private var queueTail: Int = 0
// FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue
private[this] val eventQueue = Array.ofDim[Int](1 << Integer.highestOneBit(assembly.connectionCount))
private[this] val mask = eventQueue.length - 1
private[this] var queueHead: Int = 0
private[this] var queueTail: Int = 0
/**
* Assign the boundary logic to a given connection. This will serve as the interface to the external world
* (outside the interpreter) to process and inject events.
*/
def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit = {
logic.outToConn += logic.out -> connection
logic.outToConn(logic.out.id) = connection
logic.interpreter = this
outHandlers(connection) = logic.outHandlers.head._2
outHandlers(connection) = logic.outHandlers(0)
}
/**
@ -258,9 +296,9 @@ private[stream] final class GraphInterpreter(
* (outside the interpreter) to process and inject events.
*/
def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit = {
logic.inToConn += logic.in -> connection
logic.inToConn(logic.in.id) = connection
logic.interpreter = this
inHandlers(connection) = logic.inHandlers.head._2
inHandlers(connection) = logic.inHandlers(0)
}
/**
@ -279,10 +317,15 @@ private[stream] final class GraphInterpreter(
def init(): Unit = {
var i = 0
while (i < logics.length) {
logics(i).stageId = i
logics(i).interpreter = this
logics(i).beforePreStart()
logics(i).preStart()
val logic = logics(i)
logic.stageId = i
logic.interpreter = this
try {
logic.beforePreStart()
logic.preStart()
} catch {
case NonFatal(e) logic.failStage(e)
}
i += 1
}
}
@ -293,10 +336,7 @@ private[stream] final class GraphInterpreter(
def finish(): Unit = {
var i = 0
while (i < logics.length) {
if (!isStageCompleted(i)) {
logics(i).postStop()
logics(i).afterPostStop()
}
if (!isStageCompleted(i)) finalizeStage(logics(i))
i += 1
}
}
@ -323,16 +363,8 @@ private[stream] final class GraphInterpreter(
try processEvent(connection)
catch {
case NonFatal(e)
val stageId = connectionStates(connection) match {
case Failed(ex) throw new IllegalStateException("Double fault. Failure while handling failure.", e)
case Pulled assembly.outOwners(connection)
case Completed assembly.inOwners(connection)
case Cancelled assembly.outOwners(connection)
case PushCompleted(elem) assembly.inOwners(connection)
case pushedElem assembly.inOwners(connection)
}
if (stageId == Boundary) throw e
else logics(stageId).failStage(e)
if (activeStageId == Boundary) throw e
else logics(activeStageId).failStage(e)
}
eventsRemaining -= 1
if (eventsRemaining > 0) connection = dequeue()
@ -344,61 +376,62 @@ private[stream] final class GraphInterpreter(
private def processEvent(connection: Int): Unit = {
def processElement(elem: Any): Unit = {
if (!isStageCompleted(assembly.inOwners(connection))) {
if (GraphInterpreter.Debug) println(s"PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, $elem")
inStates(connection) = Available
activeStageId = assembly.inOwners(connection)
portStates(connection) ^= PushEndFlip
inHandlers(connection).onPush()
}
}
connectionStates(connection) match {
case Pulled
if (!isStageCompleted(assembly.outOwners(connection))) {
val code = portStates(connection)
// Manual fast decoding, fast paths are PUSH and PULL
// PUSH
if ((code & (Pushing | InClosed | OutClosed)) == Pushing) {
processElement(connectionSlots(connection))
// PULL
} else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) {
if (GraphInterpreter.Debug) println(s"PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)}")
outStates(connection) = Available
portStates(connection) ^= PullEndFlip
activeStageId = assembly.outOwners(connection)
outHandlers(connection).onPull()
}
case Completed | CompletedHasElement(_)
val stageId = assembly.inOwners(connection)
if (!isStageCompleted(stageId) && inStates(connection) != Closed) {
if (GraphInterpreter.Debug) println(s"COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)}")
inStates(connection) = Closed
inHandlers(connection).onUpstreamFinish()
completeConnection(stageId)
}
case Failed(ex)
val stageId = assembly.inOwners(connection)
if (!isStageCompleted(stageId) && inStates(connection) != Closed) {
if (GraphInterpreter.Debug) println(s"FAIL ${outOwnerName(connection)} -> ${inOwnerName(connection)}")
inStates(connection) = Closed
inHandlers(connection).onUpstreamFailure(ex)
completeConnection(stageId)
}
case Cancelled
// CANCEL
} else if ((code & (OutClosed | InClosed)) == InClosed) {
val stageId = assembly.outOwners(connection)
if (!isStageCompleted(stageId) && outStates(connection) != Closed) {
if (GraphInterpreter.Debug) println(s"CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)}")
outStates(connection) = Closed
portStates(connection) |= OutClosed
activeStageId = assembly.outOwners(connection)
outHandlers(connection).onDownstreamFinish()
completeConnection(stageId)
}
case PushCompleted(elem)
} else if ((code & (OutClosed | InClosed)) == OutClosed) {
// COMPLETIONS
val stageId = assembly.inOwners(connection)
if (!isStageCompleted(stageId) && inStates(connection) != Closed) {
inStates(connection) = Available
connectionStates(connection) = elem
processElement(elem)
val elemAfter = connectionStates(connection)
if (elemAfter == Empty) enqueue(connection, Completed)
else enqueue(connection, CompletedHasElement(elemAfter))
if ((code & Pushing) == 0) {
// Normal completion (no push pending)
if (GraphInterpreter.Debug) println(s"COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)}")
portStates(connection) |= InClosed
activeStageId = assembly.inOwners(connection)
if ((portStates(connection) & InFailed) == 0) inHandlers(connection).onUpstreamFinish()
else inHandlers(connection).onUpstreamFailure(connectionSlots(connection).asInstanceOf[Failed].ex)
completeConnection(stageId)
} else {
connectionStates(connection) = Completed
// Push is pending, first process push, then re-enqueue closing event
// Non-failure case
val code = portStates(connection) & (InClosed | InFailed)
if (code == 0) {
processElement(connectionSlots(connection))
enqueue(connection)
} else if (code == InFailed) {
// Failure case
processElement(connectionSlots(connection).asInstanceOf[Failed].previousElem)
enqueue(connection)
}
}
case pushedElem processElement(pushedElem)
}
}
private def dequeue(): Int = {
@ -412,29 +445,14 @@ private[stream] final class GraphInterpreter(
}
}
private def enqueue(connection: Int, event: Any): Unit = {
connectionStates(connection) = event
private def enqueue(connection: Int): Unit = {
eventQueue(queueTail & mask) = connection
queueTail += 1
}
// Returns true if a connection has been completed *or if the completion event is already enqueued*. This is useful
// to prevent redundant completion events in case of concurrent invocation on both sides of the connection.
// I.e. when one side already enqueued the completion event, then the other side will not enqueue the event since
// there is noone to process it anymore.
def isConnectionCompleting(connection: Int): Boolean = connectionStates(connection).isInstanceOf[CompletingState]
// Returns true if the given stage is alredy completed
def isStageCompleted(stageId: Int): Boolean = stageId != Boundary && shutdownCounter(stageId) == 0
private def isPushInFlight(connection: Int): Boolean =
(inStates(connection) == InFlight) && // Other side has not been notified
hasElement(connection)
private def hasElement(connection: Int): Boolean =
!connectionStates(connection).isInstanceOf[ConnectionState] &&
connectionStates(connection) != Empty
// Register that a connection in which the given stage participated has been completed and therefore the stage
// itself might stop, too.
private def completeConnection(stageId: Int): Unit = {
@ -445,53 +463,66 @@ private[stream] final class GraphInterpreter(
// This was the last active connection keeping this stage alive
if (activeConnections == 1) {
runningStages -= 1
logics(stageId).postStop()
logics(stageId).afterPostStop()
finalizeStage(logics(stageId))
}
}
}
}
private def finalizeStage(logic: GraphStageLogic): Unit = {
try {
logic.postStop()
logic.afterPostStop()
} catch {
case NonFatal(e)
log.error(s"Error during postStop in [${assembly.stages(logic.stageId)}]", e)
}
}
private[stream] def push(connection: Int, elem: Any): Unit = {
if (!(inStates(connection) eq Closed)) {
outStates(connection) = InFlight
enqueue(connection, elem)
if ((portStates(connection) & InClosed) == 0) {
portStates(connection) ^= PushStartFlip
connectionSlots(connection) = elem
enqueue(connection)
}
}
private[stream] def pull(connection: Int): Unit = {
if (!(outStates(connection) eq Closed)) {
inStates(connection) = InFlight
enqueue(connection, Pulled)
if ((portStates(connection) & OutClosed) == 0) {
portStates(connection) ^= PullStartFlip
enqueue(connection)
}
}
private[stream] def complete(connection: Int): Unit = {
outStates(connection) = Closed
if (!isConnectionCompleting(connection) && (inStates(connection) ne Closed)) {
if (hasElement(connection)) {
// There is a pending push, we change the signal to be a PushCompleted (there can be only one signal in flight
// for a connection)
if (inStates(connection) == InFlight)
connectionStates(connection) = PushCompleted(connectionStates(connection))
else enqueue(connection, CompletedHasElement(connectionStates(connection)))
} else enqueue(connection, Completed)
val currentState = portStates(connection)
portStates(connection) = portStates(connection) | OutClosed
if ((currentState & InClosed) == 0) {
if ((currentState & Pushing) != 0) {} // FIXME: Fold into previous condition
else if (connectionSlots(connection) != Empty)
enqueue(connection)
else
enqueue(connection)
}
completeConnection(assembly.outOwners(connection))
}
private[stream] def fail(connection: Int, ex: Throwable): Unit = {
outStates(connection) = Closed
if (!isConnectionCompleting(connection) && (inStates(connection) ne Closed))
enqueue(connection, Failed(ex))
portStates(connection) |= (OutClosed | InFailed)
if ((portStates(connection) & InClosed) == 0) {
connectionSlots(connection) = Failed(ex, connectionSlots(connection))
enqueue(connection)
}
completeConnection(assembly.outOwners(connection))
}
private[stream] def cancel(connection: Int): Unit = {
inStates(connection) = Closed
if (!isConnectionCompleting(connection) && (outStates(connection) ne Closed))
enqueue(connection, Cancelled)
portStates(connection) |= InClosed
if ((portStates(connection) & OutClosed) == 0) {
connectionSlots(connection) = Empty
enqueue(connection)
}
completeConnection(assembly.inOwners(connection))
}

View file

@ -25,10 +25,9 @@ object GraphStages {
val out = Outlet[T]("out")
override val shape = FlowShape(in, out)
protected abstract class SimpleLinearStageLogic extends GraphStageLogic {
protected abstract class SimpleLinearStageLogic extends GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = completeStage()
})
}
@ -36,11 +35,9 @@ object GraphStages {
class Identity[T] extends SimpleLinearGraphStage[T] {
override def createLogic: GraphStageLogic = new SimpleLinearStageLogic {
override def createLogic: GraphStageLogic = new SimpleLinearStageLogic() {
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
}
@ -52,7 +49,7 @@ object GraphStages {
val out = Outlet[T]("out")
override val shape = FlowShape(in, out)
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
var initialized = false
setHandler(in, new InHandler {
@ -108,7 +105,7 @@ object GraphStages {
val cancelled = new AtomicBoolean(false)
val cancellable = new TickSourceCancellable(cancelled)
val logic = new GraphStageLogic {
val logic = new GraphStageLogic(shape) {
override def preStart() = {
schedulePeriodicallyWithInitialDelay("TickTimer", initialDelay, interval)
val callback = getAsyncCallback[Unit]((_) {

View file

@ -744,7 +744,7 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS
val out = Outlet[immutable.Seq[T]]("out")
val shape = FlowShape(in, out)
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
private val buf: VectorBuilder[T] = new VectorBuilder
// True if:
// - buf is nonEmpty

View file

@ -129,7 +129,7 @@ object Timeouts {
override def toString = "IdleTimeoutBidi"
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
private var nextDeadline: Deadline = Deadline.now + timeout
setHandler(in1, new InHandler {

View file

@ -40,7 +40,7 @@ class Merge[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]]
val out: Outlet[T] = Outlet[T]("Merge.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
private var initialized = false
private val pendingQueue = Array.ofDim[Inlet[T]](inputPorts)
@ -143,7 +143,7 @@ class MergePreferred[T] private (secondaryPorts: Int) extends GraphStage[MergePr
def preferred: Inlet[T] = shape.preferred
// FIXME: Factor out common stuff with Merge
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
private var initialized = false
private val pendingQueue = Array.ofDim[Inlet[T]](secondaryPorts)
@ -252,7 +252,7 @@ class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends G
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Broadcast.out" + i))
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*)
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
private var pendingCount = outputPorts
private val pending = Array.fill[Boolean](outputPorts)(true)
private var downstreamsRunning = outputPorts
@ -350,7 +350,7 @@ class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends G
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Balance.out" + i))
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)
override def createLogic: GraphStageLogic = new GraphStageLogic {
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
private val pendingQueue = Array.ofDim[Outlet[T]](outputPorts)
private var pendingHead: Int = 0
private var pendingTail: Int = 0
@ -522,7 +522,7 @@ class Concat[T](inputCount: Int) extends GraphStage[UniformFanInShape[T, T]] {
val out: Outlet[T] = Outlet[T]("Concat.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
override def createLogic = new GraphStageLogic {
override def createLogic = new GraphStageLogic(shape) {
var activeStream: Int = 0
{

View file

@ -85,10 +85,12 @@ private object TimerMessages {
* The stage logic is always stopped once all its input and output ports have been closed, i.e. it is not possible to
* keep the stage alive for further processing once it does not have any open ports.
*/
abstract class GraphStageLogic {
abstract class GraphStageLogic private[stream] (inCount: Int, outCount: Int) {
import GraphInterpreter._
import TimerMessages._
def this(shape: Shape) = this(shape.inlets.size, shape.outlets.size)
private val keyToTimers = mutable.Map[Any, Timer]()
private val timerIdGen = Iterator from 1
private var queuedTimerEvents = List.empty[Queued]
@ -109,20 +111,20 @@ abstract class GraphStageLogic {
/**
* INTERNAL API
*/
private[stream] var inHandlers = scala.collection.Map.empty[Inlet[_], InHandler]
private[stream] var inHandlers = Array.ofDim[InHandler](inCount)
/**
* INTERNAL API
*/
private[stream] var outHandlers = scala.collection.Map.empty[Outlet[_], OutHandler]
private[stream] var outHandlers = Array.ofDim[OutHandler](outCount)
/**
* INTERNAL API
*/
private[stream] var inToConn = scala.collection.Map.empty[Inlet[_], Int]
private[stream] var inToConn = Array.ofDim[Int](inHandlers.length)
/**
* INTERNAL API
*/
private[stream] var outToConn = scala.collection.Map.empty[Outlet[_], Int]
private[stream] var outToConn = Array.ofDim[Int](outHandlers.length)
/**
* INTERNAL API
@ -134,18 +136,18 @@ abstract class GraphStageLogic {
*/
final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = {
handler.ownerStageLogic = this
inHandlers += in -> handler
inHandlers(in.id) = handler
}
/**
* Assigns callbacks for the events for an [[Outlet]]
*/
final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = {
handler.ownerStageLogic = this
outHandlers += out -> handler
outHandlers(out.id) = handler
}
private def conn[T](in: Inlet[T]): Int = inToConn(in)
private def conn[T](out: Outlet[T]): Int = outToConn(out)
private def conn[T](in: Inlet[T]): Int = inToConn(in.id)
private def conn[T](out: Outlet[T]): Int = outToConn(out.id)
/**
* Requests an element on the given port. Calling this method twice before an element arrived will fail.
@ -153,9 +155,13 @@ abstract class GraphStageLogic {
* query whether pull is allowed to be called or not.
*/
final protected def pull[T](in: Inlet[T]): Unit = {
require(!hasBeenPulled(in), "Cannot pull port twice")
require(!isClosed(in), "Cannot pull closed port")
if ((interpreter.portStates(conn(in)) & (InReady | InClosed)) == InReady) {
interpreter.pull(conn(in))
} else {
// Detailed error information should not add overhead to the hot path
require(!isClosed(in), "Cannot pull closed port")
require(!hasBeenPulled(in), "Cannot pull port twice")
}
}
/**
@ -171,17 +177,20 @@ abstract class GraphStageLogic {
* The method [[isAvailable()]] can be used to query if the port has an element that can be grabbed or not.
*/
final protected def grab[T](in: Inlet[T]): T = {
require(isAvailable(in), "Cannot get element from already empty input port")
val connection = conn(in)
val elem = interpreter.connectionStates(connection)
elem match {
case CompletedHasElement(realElem)
interpreter.connectionStates(connection) = Completed
realElem.asInstanceOf[T]
case _
interpreter.connectionStates(connection) = Empty
// Fast path
if ((interpreter.portStates(connection) & (InReady | InFailed)) == InReady &&
(interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty)) {
val elem = interpreter.connectionSlots(connection)
interpreter.connectionSlots(connection) = Empty
elem.asInstanceOf[T]
} else {
// Slow path
require(isAvailable(in), "Cannot get element from already empty input port")
val failed = interpreter.connectionSlots(connection).asInstanceOf[Failed]
val elem = failed.previousElem.asInstanceOf[T]
interpreter.connectionSlots(connection) = Failed(failed.ex, Empty)
elem
}
}
@ -189,7 +198,7 @@ abstract class GraphStageLogic {
* Indicates whether there is already a pending pull for the given input port. If this method returns true
* then [[isAvailable()]] must return false for that same port.
*/
final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = interpreter.inStates(conn(in)) eq InFlight
final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = (interpreter.portStates(conn(in)) & (InReady | InClosed)) == 0
/**
* Indicates whether there is an element waiting at the given input port. [[grab()]] can be used to retrieve the
@ -199,19 +208,26 @@ abstract class GraphStageLogic {
*/
final protected def isAvailable[T](in: Inlet[T]): Boolean = {
val connection = conn(in)
val state = interpreter.connectionStates(connection)
val arrived = interpreter.inStates(connection) ne InFlight
val hasElementState = state.isInstanceOf[HasElementState]
val rawElement = (state != Empty) && !state.isInstanceOf[ConnectionState]
val normalArrived = (interpreter.portStates(conn(in)) & (InReady | InFailed)) == InReady
arrived && (hasElementState || rawElement)
// Fast path
if (normalArrived) interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty
else {
// Slow path on failure
if ((interpreter.portStates(conn(in)) & (InReady | InFailed)) == (InReady | InFailed)) {
interpreter.connectionSlots(connection) match {
case Failed(_, elem) elem.asInstanceOf[AnyRef] ne Empty
case _ false // This can only be Empty actually (if a cancel was concurrent with a failure)
}
} else false
}
}
/**
* Indicates whether the port has been closed. A closed port cannot be pulled.
*/
final protected def isClosed[T](in: Inlet[T]): Boolean = interpreter.inStates(conn(in)) eq Closed
final protected def isClosed[T](in: Inlet[T]): Boolean = (interpreter.portStates(conn(in)) & InClosed) != 0
/**
* Emits an element through the given output port. Calling this method twice before a [[pull()]] has been arrived
@ -219,9 +235,13 @@ abstract class GraphStageLogic {
* used to check if the port is ready to be pushed or not.
*/
final protected def push[T](out: Outlet[T], elem: T): Unit = {
if ((interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady) {
interpreter.push(conn(out), elem)
} else {
// Detailed error information should not add overhead to the hot path
require(isAvailable(out), "Cannot push port twice")
require(!isClosed(out), "Cannot pull closed port")
interpreter.push(conn(out), elem)
}
}
/**
@ -239,8 +259,16 @@ abstract class GraphStageLogic {
* then stops the stage, then [[postStop()]] is called.
*/
final def completeStage(): Unit = {
inToConn.valuesIterator.foreach(interpreter.cancel)
outToConn.valuesIterator.foreach(interpreter.complete)
var i = 0
while (i < inToConn.length) {
interpreter.cancel(inToConn(i))
i += 1
}
i = 0
while (i < outToConn.length) {
interpreter.complete(outToConn(i))
i += 1
}
}
/**
@ -248,19 +276,28 @@ abstract class GraphStageLogic {
* then stops the stage, then [[postStop()]] is called.
*/
final def failStage(ex: Throwable): Unit = {
inToConn.valuesIterator.foreach(interpreter.cancel)
outToConn.valuesIterator.foreach(interpreter.fail(_, ex))
var i = 0
while (i < inToConn.length) {
interpreter.cancel(inToConn(i))
i += 1
}
i = 0
while (i < outToConn.length) {
interpreter.fail(outToConn(i), ex)
i += 1
}
}
/**
* Return true if the given output port is ready to be pushed.
*/
final def isAvailable[T](out: Outlet[T]): Boolean = interpreter.outStates(conn(out)) eq Available
final def isAvailable[T](out: Outlet[T]): Boolean =
(interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady
/**
* Indicates whether the port has been closed. A closed port cannot be pushed.
*/
final protected def isClosed[T](out: Outlet[T]): Boolean = interpreter.outStates(conn(out)) eq Closed
final protected def isClosed[T](out: Outlet[T]): Boolean = (interpreter.portStates(conn(out)) & OutClosed) != 0
/**
* Obtain a callback object that can be used asynchronously to re-enter the