Merge pull request #21002 from drewhk/wip-interpreter-improvements-drewhk
WIP - Interpreter improvements
This commit is contained in:
commit
acc5223d38
21 changed files with 796 additions and 263 deletions
|
|
@ -0,0 +1,331 @@
|
|||
/**
|
||||
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream
|
||||
|
||||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.impl.fusing.GraphStages
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.stage._
|
||||
import org.openjdk.jmh.annotations.{ OperationsPerInvocation, _ }
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object FusedGraphsBenchmark {
|
||||
val ElementCount = 100 * 1000
|
||||
|
||||
@volatile var blackhole: org.openjdk.jmh.infra.Blackhole = _
|
||||
}
|
||||
|
||||
// Just to avoid allocations and still have a way to do some work in stages. The value itself does not matter
|
||||
// so no issues with sharing (the result does not make any sense, but hey)
|
||||
class MutableElement(var value: Int)
|
||||
|
||||
class TestSource(elems: Array[MutableElement]) extends GraphStage[SourceShape[MutableElement]] {
|
||||
val out = Outlet[MutableElement]("TestSource.out")
|
||||
override val shape = SourceShape(out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
|
||||
private[this] var left = FusedGraphsBenchmark.ElementCount - 1
|
||||
|
||||
override def onPull(): Unit = {
|
||||
if (left >= 0) {
|
||||
push(out, elems(left))
|
||||
left -= 1
|
||||
} else completeStage()
|
||||
}
|
||||
|
||||
setHandler(out, this)
|
||||
}
|
||||
}
|
||||
|
||||
class JitSafeCompletionLatch extends GraphStageWithMaterializedValue[SinkShape[MutableElement], CountDownLatch] {
|
||||
val in = Inlet[MutableElement]("JitSafeCompletionLatch.in")
|
||||
override val shape = SinkShape(in)
|
||||
|
||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, CountDownLatch) = {
|
||||
val latch = new CountDownLatch(1)
|
||||
val logic = new GraphStageLogic(shape) with InHandler {
|
||||
private[this] var sum = 0
|
||||
|
||||
override def preStart(): Unit = pull(in)
|
||||
override def onPush(): Unit = {
|
||||
sum += grab(in).value
|
||||
pull(in)
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
// Do not ignore work along the chain
|
||||
FusedGraphsBenchmark.blackhole.consume(sum)
|
||||
latch.countDown()
|
||||
completeStage()
|
||||
}
|
||||
|
||||
setHandler(in, this)
|
||||
}
|
||||
|
||||
(logic, latch)
|
||||
}
|
||||
}
|
||||
|
||||
class IdentityStage extends GraphStage[FlowShape[MutableElement, MutableElement]] {
|
||||
val in = Inlet[MutableElement]("Identity.in")
|
||||
val out = Outlet[MutableElement]("Identity.out")
|
||||
override val shape = FlowShape(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
override def onPush(): Unit = push(out, grab(in))
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
@OutputTimeUnit(TimeUnit.MILLISECONDS)
|
||||
@BenchmarkMode(Array(Mode.Throughput))
|
||||
class FusedGraphsBenchmark {
|
||||
import FusedGraphsBenchmark._
|
||||
|
||||
implicit val system = ActorSystem("test")
|
||||
var materializer: ActorMaterializer = _
|
||||
var testElements: Array[MutableElement] = _
|
||||
|
||||
var singleIdentity: RunnableGraph[CountDownLatch] = _
|
||||
var chainOfIdentities: RunnableGraph[CountDownLatch] = _
|
||||
var singleMap: RunnableGraph[CountDownLatch] = _
|
||||
var chainOfMaps: RunnableGraph[CountDownLatch] = _
|
||||
var repeatTakeMapAndFold: RunnableGraph[CountDownLatch] = _
|
||||
var singleBuffer: RunnableGraph[CountDownLatch] = _
|
||||
var chainOfBuffers: RunnableGraph[CountDownLatch] = _
|
||||
var broadcastZip: RunnableGraph[CountDownLatch] = _
|
||||
var balanceMerge: RunnableGraph[CountDownLatch] = _
|
||||
var broadcastZipBalanceMerge: RunnableGraph[CountDownLatch] = _
|
||||
|
||||
@Setup
|
||||
def setup(): Unit = {
|
||||
val settings = ActorMaterializerSettings(system)
|
||||
.withFuzzing(false)
|
||||
.withSyncProcessingLimit(Int.MaxValue)
|
||||
.withAutoFusing(false) // We fuse manually in this test in the setup
|
||||
|
||||
materializer = ActorMaterializer(settings)
|
||||
testElements = Array.fill(ElementCount)(new MutableElement(0))
|
||||
val addFunc = (x: MutableElement) => { x.value += 1; x }
|
||||
|
||||
val testSource = Source.fromGraph(new TestSource(testElements))
|
||||
val testSink = Sink.fromGraph(new JitSafeCompletionLatch)
|
||||
|
||||
def fuse(r: RunnableGraph[CountDownLatch]): RunnableGraph[CountDownLatch] = {
|
||||
RunnableGraph.fromGraph(Fusing.aggressive(r))
|
||||
}
|
||||
|
||||
val identityStage = new IdentityStage
|
||||
|
||||
singleIdentity =
|
||||
fuse(
|
||||
testSource
|
||||
.via(identityStage)
|
||||
.toMat(testSink)(Keep.right)
|
||||
)
|
||||
|
||||
chainOfIdentities =
|
||||
fuse(
|
||||
testSource
|
||||
.via(identityStage)
|
||||
.via(identityStage)
|
||||
.via(identityStage)
|
||||
.via(identityStage)
|
||||
.via(identityStage)
|
||||
.via(identityStage)
|
||||
.via(identityStage)
|
||||
.via(identityStage)
|
||||
.via(identityStage)
|
||||
.via(identityStage)
|
||||
.toMat(testSink)(Keep.right)
|
||||
)
|
||||
|
||||
singleMap =
|
||||
fuse(
|
||||
testSource
|
||||
.map(addFunc)
|
||||
.toMat(testSink)(Keep.right)
|
||||
)
|
||||
|
||||
chainOfMaps =
|
||||
fuse(
|
||||
testSource
|
||||
.map(addFunc)
|
||||
.map(addFunc)
|
||||
.map(addFunc)
|
||||
.map(addFunc)
|
||||
.map(addFunc)
|
||||
.map(addFunc)
|
||||
.map(addFunc)
|
||||
.map(addFunc)
|
||||
.map(addFunc)
|
||||
.map(addFunc)
|
||||
.toMat(testSink)(Keep.right)
|
||||
)
|
||||
|
||||
repeatTakeMapAndFold =
|
||||
fuse(
|
||||
Source.repeat(new MutableElement(0))
|
||||
.take(ElementCount)
|
||||
.map(addFunc)
|
||||
.map(addFunc)
|
||||
.fold(new MutableElement(0))((acc, x) => { acc.value += x.value; acc })
|
||||
.toMat(testSink)(Keep.right)
|
||||
)
|
||||
|
||||
singleBuffer =
|
||||
fuse(
|
||||
testSource
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.toMat(testSink)(Keep.right)
|
||||
)
|
||||
|
||||
chainOfBuffers =
|
||||
fuse(
|
||||
testSource
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.buffer(10, OverflowStrategy.backpressure)
|
||||
.toMat(testSink)(Keep.right)
|
||||
)
|
||||
|
||||
val broadcastZipFlow: Flow[MutableElement, MutableElement, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b =>
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
val bcast = b.add(Broadcast[MutableElement](2))
|
||||
val zip = b.add(Zip[MutableElement, MutableElement]())
|
||||
|
||||
bcast ~> zip.in0
|
||||
bcast ~> zip.in1
|
||||
|
||||
FlowShape(bcast.in, zip.out.map(_._1).outlet)
|
||||
})
|
||||
|
||||
val balanceMergeFlow: Flow[MutableElement, MutableElement, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b =>
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
val balance = b.add(Balance[MutableElement](2))
|
||||
val merge = b.add(Merge[MutableElement](2))
|
||||
|
||||
balance ~> merge
|
||||
balance ~> merge
|
||||
|
||||
FlowShape(balance.in, merge.out)
|
||||
})
|
||||
|
||||
broadcastZip =
|
||||
fuse(
|
||||
testSource
|
||||
.via(broadcastZipFlow)
|
||||
.toMat(testSink)(Keep.right)
|
||||
)
|
||||
|
||||
balanceMerge =
|
||||
fuse(
|
||||
testSource
|
||||
.via(balanceMergeFlow)
|
||||
.toMat(testSink)(Keep.right)
|
||||
)
|
||||
|
||||
broadcastZipBalanceMerge =
|
||||
fuse(
|
||||
testSource
|
||||
.via(broadcastZipFlow)
|
||||
.via(balanceMergeFlow)
|
||||
.toMat(testSink)(Keep.right)
|
||||
)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100 * 1000)
|
||||
def single_identity(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
|
||||
FusedGraphsBenchmark.blackhole = blackhole
|
||||
singleIdentity.run()(materializer).await()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100 * 1000)
|
||||
def chain_of_identities(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
|
||||
FusedGraphsBenchmark.blackhole = blackhole
|
||||
chainOfIdentities.run()(materializer).await()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100 * 1000)
|
||||
def single_map(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
|
||||
FusedGraphsBenchmark.blackhole = blackhole
|
||||
singleMap.run()(materializer).await()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100 * 1000)
|
||||
def chain_of_maps(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
|
||||
FusedGraphsBenchmark.blackhole = blackhole
|
||||
chainOfMaps.run()(materializer).await()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100 * 1000)
|
||||
def repeat_take_map_and_fold(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
|
||||
FusedGraphsBenchmark.blackhole = blackhole
|
||||
repeatTakeMapAndFold.run()(materializer).await()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100 * 1000)
|
||||
def single_buffer(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
|
||||
FusedGraphsBenchmark.blackhole = blackhole
|
||||
singleBuffer.run()(materializer).await()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100 * 1000)
|
||||
def chain_of_buffers(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
|
||||
FusedGraphsBenchmark.blackhole = blackhole
|
||||
chainOfBuffers.run()(materializer).await()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100 * 1000)
|
||||
def broadcast_zip(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
|
||||
FusedGraphsBenchmark.blackhole = blackhole
|
||||
broadcastZip.run()(materializer).await()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100 * 1000)
|
||||
def balance_merge(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
|
||||
FusedGraphsBenchmark.blackhole = blackhole
|
||||
balanceMerge.run()(materializer).await()
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(100 * 1000)
|
||||
def boradcast_zip_balance_merge(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = {
|
||||
FusedGraphsBenchmark.blackhole = blackhole
|
||||
broadcastZipBalanceMerge.run()(materializer).await()
|
||||
}
|
||||
|
||||
@TearDown
|
||||
def shutdown(): Unit = {
|
||||
Await.result(system.terminate(), 5.seconds)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -100,8 +100,15 @@ public class RecipeByteStrings extends RecipeTest {
|
|||
@Override
|
||||
public void onUpstreamFinish() throws Exception {
|
||||
if (buffer.isEmpty()) completeStage();
|
||||
// elements left in buffer, keep accepting downstream pulls
|
||||
// and push from buffer until buffer is emitted
|
||||
else {
|
||||
// There are elements left in buffer, so
|
||||
// we keep accepting downstream pulls and push from buffer until emptied.
|
||||
//
|
||||
// It might be though, that the upstream finished while it was pulled, in which
|
||||
// case we will not get an onPull from the downstream, because we already had one.
|
||||
// In that case we need to emit from the buffer.
|
||||
if (isAvailable(out)) emitChunk();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,8 +42,15 @@ class RecipeByteStrings extends RecipeSpec {
|
|||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (buffer.isEmpty) completeStage()
|
||||
// elements left in buffer, keep accepting downstream pulls
|
||||
// and push from buffer until buffer is emitted
|
||||
else {
|
||||
// There are elements left in buffer, so
|
||||
// we keep accepting downstream pulls and push from buffer until emptied.
|
||||
//
|
||||
// It might be though, that the upstream finished while it was pulled, in which
|
||||
// case we will not get an onPull from the downstream, because we already had one.
|
||||
// In that case we need to emit from the buffer.
|
||||
if (isAvailable(out)) emitChunk()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -206,8 +206,8 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
.connect(passThrough.out, Downstream)
|
||||
.init()
|
||||
|
||||
interpreter.complete(0)
|
||||
interpreter.cancel(1)
|
||||
interpreter.complete(interpreter.connections(0))
|
||||
interpreter.cancel(interpreter.connections(1))
|
||||
interpreter.execute(2)
|
||||
|
||||
expectMsg("postStop2")
|
||||
|
|
|
|||
|
|
@ -368,11 +368,8 @@ class GraphInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
|
||||
sink.requestOne(eventLimit = 0)
|
||||
source.onComplete(eventLimit = 3)
|
||||
lastEvents() should ===(Set(OnNext(sink, "C")))
|
||||
|
||||
sink.requestOne()
|
||||
lastEvents() should ===(Set(OnComplete(sink)))
|
||||
|
||||
// OnComplete arrives early due to push chasing
|
||||
lastEvents() should ===(Set(OnNext(sink, "C"), OnComplete(sink)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -71,17 +71,17 @@ trait GraphInterpreterSpecKit extends StreamSpec {
|
|||
def init(): Unit = {
|
||||
val assembly = buildAssembly()
|
||||
|
||||
val (inHandlers, outHandlers, logics) =
|
||||
val (conns, logics) =
|
||||
assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ())
|
||||
_interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics,
|
||||
_interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, logics, conns,
|
||||
(_, _, _) ⇒ (), fuzzingMode = false, null)
|
||||
|
||||
for ((upstream, i) ← upstreams.zipWithIndex) {
|
||||
_interpreter.attachUpstreamBoundary(i, upstream._1)
|
||||
_interpreter.attachUpstreamBoundary(conns(i), upstream._1)
|
||||
}
|
||||
|
||||
for ((downstream, i) ← downstreams.zipWithIndex) {
|
||||
_interpreter.attachDownstreamBoundary(i + upstreams.size + connections.size, downstream._2)
|
||||
_interpreter.attachDownstreamBoundary(conns(i + upstreams.size + connections.size), downstream._2)
|
||||
}
|
||||
|
||||
_interpreter.init(null)
|
||||
|
|
@ -89,9 +89,9 @@ trait GraphInterpreterSpecKit extends StreamSpec {
|
|||
}
|
||||
|
||||
def manualInit(assembly: GraphAssembly): Unit = {
|
||||
val (inHandlers, outHandlers, logics) =
|
||||
val (connections, logics) =
|
||||
assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ())
|
||||
_interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics,
|
||||
_interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, logics, connections,
|
||||
(_, _, _) ⇒ (), fuzzingMode = false, null)
|
||||
}
|
||||
|
||||
|
|
@ -202,7 +202,7 @@ trait GraphInterpreterSpecKit extends StreamSpec {
|
|||
|
||||
// Modified onPush that does not grab() automatically the element. This accesses some internals.
|
||||
override def onPush(): Unit = {
|
||||
val internalEvent = interpreter.connectionSlots(portToConn(in.id))
|
||||
val internalEvent = portToConn(in.id).slot
|
||||
|
||||
internalEvent match {
|
||||
case Failed(_, elem) ⇒ lastEvent += OnNext(DownstreamPortProbe.this, elem)
|
||||
|
|
@ -224,8 +224,8 @@ trait GraphInterpreterSpecKit extends StreamSpec {
|
|||
outOwners = Array(-1))
|
||||
|
||||
manualInit(assembly)
|
||||
interpreter.attachDownstreamBoundary(0, in)
|
||||
interpreter.attachUpstreamBoundary(0, out)
|
||||
interpreter.attachDownstreamBoundary(interpreter.connections(0), in)
|
||||
interpreter.attachUpstreamBoundary(interpreter.connections(0), out)
|
||||
interpreter.init(null)
|
||||
}
|
||||
|
||||
|
|
@ -310,8 +310,6 @@ trait GraphInterpreterSpecKit extends StreamSpec {
|
|||
abstract class OneBoundedSetup[T](_ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder {
|
||||
val ops = _ops.toArray
|
||||
|
||||
def this(op: Seq[Stage[_, _]], dummy: Int = 42) = this(op.map(_.toGS): _*)
|
||||
|
||||
val upstream = new UpstreamOneBoundedProbe[T]
|
||||
val downstream = new DownstreamOneBoundedPortProbe[T]
|
||||
var lastEvent = Set.empty[TestEvent]
|
||||
|
|
|
|||
|
|
@ -7,8 +7,8 @@ import akka.stream.impl.ConstantFun
|
|||
import akka.stream.stage._
|
||||
import akka.stream.testkit.StreamSpec
|
||||
import akka.testkit.EventFilter
|
||||
|
||||
import akka.stream._
|
||||
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
|
||||
|
||||
class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
||||
import Supervision.stoppingDecider
|
||||
|
|
@ -24,7 +24,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
|
||||
"Interpreter" must {
|
||||
|
||||
"implement map correctly" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) {
|
||||
"implement map correctly" in new OneBoundedSetup[Int](Map((x: Int) ⇒ x + 1)) {
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
downstream.requestOne()
|
||||
|
|
@ -43,10 +43,10 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
lastEvents() should be(Set(OnComplete))
|
||||
}
|
||||
|
||||
"implement chain of maps correctly" in new OneBoundedSetup[Int](Seq(
|
||||
Map((x: Int) ⇒ x + 1, stoppingDecider),
|
||||
Map((x: Int) ⇒ x * 2, stoppingDecider),
|
||||
Map((x: Int) ⇒ x + 1, stoppingDecider))) {
|
||||
"implement chain of maps correctly" in new OneBoundedSetup[Int](
|
||||
Map((x: Int) ⇒ x + 1),
|
||||
Map((x: Int) ⇒ x * 2),
|
||||
Map((x: Int) ⇒ x + 1)) {
|
||||
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
|
|
@ -66,7 +66,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
lastEvents() should be(Set(Cancel))
|
||||
}
|
||||
|
||||
"work with only boundary ops" in new OneBoundedSetup[Int](Seq.empty) {
|
||||
"work with only boundary ops" in new OneBoundedSetup[Int]() {
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
downstream.requestOne()
|
||||
|
|
@ -149,7 +149,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
"implement take inside a chain" in new OneBoundedSetup[Int](
|
||||
Filter((x: Int) ⇒ x != 0),
|
||||
takeTwo,
|
||||
Map((x: Int) ⇒ x + 1, stoppingDecider).toGS) {
|
||||
Map((x: Int) ⇒ x + 1)) {
|
||||
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
|
|
@ -433,11 +433,11 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
|
||||
// Note, the new interpreter has no jumpback table, still did not want to remove the test
|
||||
"work with jumpback table and completed elements" in new OneBoundedSetup[Int](
|
||||
Map((x: Int) ⇒ x, stoppingDecider).toGS,
|
||||
Map((x: Int) ⇒ x, stoppingDecider).toGS,
|
||||
Map((x: Int) ⇒ x),
|
||||
Map((x: Int) ⇒ x),
|
||||
KeepGoing(),
|
||||
Map((x: Int) ⇒ x, stoppingDecider).toGS,
|
||||
Map((x: Int) ⇒ x, stoppingDecider).toGS) {
|
||||
Map((x: Int) ⇒ x),
|
||||
Map((x: Int) ⇒ x)) {
|
||||
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
|
|
@ -464,8 +464,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
|
||||
}
|
||||
|
||||
"work with pushAndFinish if upstream completes with pushAndFinish" in new OneBoundedSetup[Int](Seq(
|
||||
new PushFinishStage)) {
|
||||
"work with pushAndFinish if upstream completes with pushAndFinish" in new OneBoundedSetup[Int](new PushFinishStage) {
|
||||
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
|
|
@ -476,10 +475,10 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
lastEvents() should be(Set(OnNext(0), OnComplete))
|
||||
}
|
||||
|
||||
"work with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[Int](Seq(
|
||||
Map((x: Any) ⇒ x, stoppingDecider),
|
||||
"work with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[Int](
|
||||
Map((x: Any) ⇒ x),
|
||||
new PushFinishStage,
|
||||
Map((x: Any) ⇒ x, stoppingDecider))) {
|
||||
Map((x: Any) ⇒ x)) {
|
||||
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
|
|
@ -491,7 +490,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
}
|
||||
|
||||
"work with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new OneBoundedSetup[Int](
|
||||
(new PushFinishStage).toGS,
|
||||
new PushFinishStage,
|
||||
Fold(0, (x: Int, y: Int) ⇒ x + y)) {
|
||||
|
||||
lastEvents() should be(Set.empty)
|
||||
|
|
@ -503,11 +502,18 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
lastEvents() should be(Set(OnNext(1), OnComplete))
|
||||
}
|
||||
|
||||
"report error if pull is called while op is terminating" in new OneBoundedSetup[Int](Seq(new PushPullStage[Any, Any] {
|
||||
override def onPull(ctx: Context[Any]): SyncDirective = ctx.pull()
|
||||
override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = ctx.pull()
|
||||
override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = ctx.absorbTermination()
|
||||
})) {
|
||||
"report error if pull is called while op is terminating" in new OneBoundedSetup[Int](
|
||||
new SimpleLinearGraphStage[Any] {
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
override def onPush(): Unit = pull(in)
|
||||
override def onPull(): Unit = pull(in)
|
||||
override def onUpstreamFinish(): Unit = if (!hasBeenPulled(in)) pull(in)
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
) {
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
downstream.requestOne()
|
||||
|
|
@ -558,8 +564,8 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
override def onDownstreamFinish(ctx: Context[Int]): TerminationDirective = ctx.absorbTermination()
|
||||
}
|
||||
|
||||
"not allow absorbTermination from onDownstreamFinish()" in new OneBoundedSetup[Int](Seq(
|
||||
new InvalidAbsorbTermination)) {
|
||||
// This test must be kept since it tests the compatibility layer, which while is deprecated it is still here.
|
||||
"not allow absorbTermination from onDownstreamFinish()" in new OneBoundedSetup[Int]((new InvalidAbsorbTermination).toGS) {
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
EventFilter[UnsupportedOperationException]("It is not allowed to call absorbTermination() from onDownstreamFinish.", occurrences = 1).intercept {
|
||||
|
|
@ -635,16 +641,19 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
override val shape: FlowShape[T, T] = FlowShape(in, out)
|
||||
}
|
||||
|
||||
// This test is related to issue #17351
|
||||
private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends PushStage[Any, Any] {
|
||||
override def onPush(elem: Any, ctx: Context[Any]): SyncDirective =
|
||||
ctx.pushAndFinish(elem)
|
||||
private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends SimpleLinearGraphStage[Any] {
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
override def onPush(): Unit = {
|
||||
push(out, grab(in))
|
||||
completeStage()
|
||||
}
|
||||
override def onPull(): Unit = pull(in)
|
||||
override def onUpstreamFinish(): Unit = failStage(akka.stream.testkit.Utils.TE("Cannot happen"))
|
||||
override def postStop(): Unit = onPostStop()
|
||||
|
||||
override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective =
|
||||
ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen"))
|
||||
|
||||
override def postStop(): Unit =
|
||||
onPostStop()
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
val halfLength = chainLength / 2
|
||||
val repetition = 100
|
||||
|
||||
val map = Map((x: Int) ⇒ x + 1, stoppingDecider).toGS
|
||||
val map = Map((x: Int) ⇒ x + 1)
|
||||
|
||||
// GraphStages can be reused
|
||||
val dropOne = Drop(1)
|
||||
|
|
|
|||
|
|
@ -6,11 +6,9 @@ package akka.stream.impl.fusing
|
|||
import akka.stream.testkit.StreamSpec
|
||||
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.stream.Supervision
|
||||
import akka.stream.stage.Context
|
||||
import akka.stream.stage.PushPullStage
|
||||
import akka.stream.stage.Stage
|
||||
import akka.stream.stage.SyncDirective
|
||||
import akka.stream.{ ActorAttributes, Attributes, Supervision }
|
||||
import akka.stream.stage._
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit {
|
||||
import Supervision.stoppingDecider
|
||||
|
|
@ -21,16 +19,22 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
|
|||
override def toString = "TE"
|
||||
}
|
||||
|
||||
class ResumingMap[In, Out](_f: In ⇒ Out) extends Map(_f) {
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
super.createLogic(inheritedAttributes.and(ActorAttributes.supervisionStrategy(resumingDecider)))
|
||||
}
|
||||
|
||||
"Interpreter error handling" must {
|
||||
|
||||
"handle external failure" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) {
|
||||
"handle external failure" in new OneBoundedSetup[Int](Map((x: Int) ⇒ x + 1)) {
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
upstream.onError(TE)
|
||||
lastEvents() should be(Set(OnError(TE)))
|
||||
}
|
||||
|
||||
"emit failure when op throws" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, stoppingDecider))) {
|
||||
"emit failure when op throws" in new OneBoundedSetup[Int](Map((x: Int) ⇒ if (x == 0) throw TE else x)) {
|
||||
downstream.requestOne()
|
||||
lastEvents() should be(Set(RequestOne))
|
||||
upstream.onNext(2)
|
||||
|
|
@ -42,10 +46,10 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
|
|||
lastEvents() should be(Set(Cancel, OnError(TE)))
|
||||
}
|
||||
|
||||
"emit failure when op throws in middle of the chain" in new OneBoundedSetup[Int](Seq(
|
||||
Map((x: Int) ⇒ x + 1, stoppingDecider),
|
||||
Map((x: Int) ⇒ if (x == 0) throw TE else x + 10, stoppingDecider),
|
||||
Map((x: Int) ⇒ x + 100, stoppingDecider))) {
|
||||
"emit failure when op throws in middle of the chain" in new OneBoundedSetup[Int](
|
||||
Map((x: Int) ⇒ x + 1),
|
||||
Map((x: Int) ⇒ if (x == 0) throw TE else x + 10),
|
||||
Map((x: Int) ⇒ x + 100)) {
|
||||
|
||||
downstream.requestOne()
|
||||
lastEvents() should be(Set(RequestOne))
|
||||
|
|
@ -58,7 +62,9 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
|
|||
lastEvents() should be(Set(Cancel, OnError(TE)))
|
||||
}
|
||||
|
||||
"resume when Map throws" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, resumingDecider))) {
|
||||
"resume when Map throws" in new OneBoundedSetup[Int](
|
||||
new ResumingMap((x: Int) ⇒ if (x == 0) throw TE else x)
|
||||
) {
|
||||
downstream.requestOne()
|
||||
lastEvents() should be(Set(RequestOne))
|
||||
upstream.onNext(2)
|
||||
|
|
@ -82,10 +88,11 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
|
|||
lastEvents() should be(Set(OnNext(4)))
|
||||
}
|
||||
|
||||
"resume when Map throws in middle of the chain" in new OneBoundedSetup[Int](Seq(
|
||||
Map((x: Int) ⇒ x + 1, resumingDecider),
|
||||
Map((x: Int) ⇒ if (x == 0) throw TE else x + 10, resumingDecider),
|
||||
Map((x: Int) ⇒ x + 100, resumingDecider))) {
|
||||
"resume when Map throws in middle of the chain" in new OneBoundedSetup[Int](
|
||||
new ResumingMap((x: Int) ⇒ x + 1),
|
||||
new ResumingMap((x: Int) ⇒ if (x == 0) throw TE else x + 10),
|
||||
new ResumingMap((x: Int) ⇒ x + 100)
|
||||
) {
|
||||
|
||||
downstream.requestOne()
|
||||
lastEvents() should be(Set(RequestOne))
|
||||
|
|
@ -102,8 +109,8 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
|
|||
}
|
||||
|
||||
"resume when Map throws before Grouped" in new OneBoundedSetup[Int](
|
||||
Map((x: Int) ⇒ x + 1, resumingDecider).toGS,
|
||||
Map((x: Int) ⇒ if (x <= 0) throw TE else x + 10, resumingDecider).toGS,
|
||||
new ResumingMap((x: Int) ⇒ x + 1),
|
||||
new ResumingMap((x: Int) ⇒ if (x <= 0) throw TE else x + 10),
|
||||
Grouped(3)) {
|
||||
|
||||
downstream.requestOne()
|
||||
|
|
@ -122,8 +129,8 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit
|
|||
}
|
||||
|
||||
"complete after resume when Map throws before Grouped" in new OneBoundedSetup[Int](
|
||||
Map((x: Int) ⇒ x + 1, resumingDecider).toGS,
|
||||
Map((x: Int) ⇒ if (x <= 0) throw TE else x + 10, resumingDecider).toGS,
|
||||
new ResumingMap((x: Int) ⇒ x + 1),
|
||||
new ResumingMap((x: Int) ⇒ if (x <= 0) throw TE else x + 10),
|
||||
Grouped(1000)) {
|
||||
|
||||
downstream.requestOne()
|
||||
|
|
|
|||
|
|
@ -16,14 +16,14 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
|
||||
"work in the happy case" in {
|
||||
val itr = new IteratorInterpreter[Int, Int]((1 to 10).iterator, Seq(
|
||||
Map((x: Int) ⇒ x + 1, stoppingDecider).toGS)).iterator
|
||||
Map((x: Int) ⇒ x + 1))).iterator
|
||||
|
||||
itr.toSeq should be(2 to 11)
|
||||
}
|
||||
|
||||
"hasNext should not affect elements" in {
|
||||
val itr = new IteratorInterpreter[Int, Int]((1 to 10).iterator, Seq(
|
||||
Map((x: Int) ⇒ x, stoppingDecider).toGS)).iterator
|
||||
Map((x: Int) ⇒ x))).iterator
|
||||
|
||||
itr.hasNext should be(true)
|
||||
itr.hasNext should be(true)
|
||||
|
|
@ -42,7 +42,7 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
|
||||
"throw exceptions on empty iterator" in {
|
||||
val itr = new IteratorInterpreter[Int, Int](List(1).iterator, Seq(
|
||||
Map((x: Int) ⇒ x, stoppingDecider).toGS)).iterator
|
||||
Map((x: Int) ⇒ x))).iterator
|
||||
|
||||
itr.next() should be(1)
|
||||
a[NoSuchElementException] should be thrownBy { itr.next() }
|
||||
|
|
@ -50,7 +50,7 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
|
||||
"throw exceptions when op in chain throws" in {
|
||||
val itr = new IteratorInterpreter[Int, Int](List(1, 2, 3).iterator, Seq(
|
||||
Map((n: Int) ⇒ if (n == 2) throw new ArithmeticException() else n, stoppingDecider).toGS)).iterator
|
||||
Map((n: Int) ⇒ if (n == 2) throw new ArithmeticException() else n))).iterator
|
||||
|
||||
itr.next() should be(1)
|
||||
itr.hasNext should be(true)
|
||||
|
|
@ -60,7 +60,7 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
|
||||
"work with an empty iterator" in {
|
||||
val itr = new IteratorInterpreter[Int, Int](Iterator.empty, Seq(
|
||||
Map((x: Int) ⇒ x + 1, stoppingDecider).toGS)).iterator
|
||||
Map((x: Int) ⇒ x + 1))).iterator
|
||||
|
||||
itr.hasNext should be(false)
|
||||
a[NoSuchElementException] should be thrownBy { itr.next() }
|
||||
|
|
|
|||
|
|
@ -84,9 +84,9 @@ class LifecycleInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
}
|
||||
|
||||
"onError when preStart fails with stages after" in new OneBoundedSetup[String](
|
||||
Map((x: Int) ⇒ x, stoppingDecider).toGS,
|
||||
Map((x: Int) ⇒ x),
|
||||
PreStartFailer(() ⇒ throw TE("Boom!")),
|
||||
Map((x: Int) ⇒ x, stoppingDecider).toGS) {
|
||||
Map((x: Int) ⇒ x)) {
|
||||
lastEvents() should ===(Set(Cancel, OnError(TE("Boom!"))))
|
||||
}
|
||||
|
||||
|
|
@ -112,9 +112,9 @@ class LifecycleInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
|
|||
}
|
||||
|
||||
"postStop when pushAndFinish called with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[String](
|
||||
Map((x: Any) ⇒ x, stoppingDecider).toGS,
|
||||
Map((x: Any) ⇒ x),
|
||||
new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"),
|
||||
Map((x: Any) ⇒ x, stoppingDecider).toGS) {
|
||||
Map((x: Any) ⇒ x)) {
|
||||
|
||||
lastEvents() should be(Set.empty)
|
||||
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re
|
|||
}
|
||||
|
||||
val faultyFlow: Flow[Any, Any, NotUsed] ⇒ Flow[Any, Any, NotUsed] = in ⇒ in.via({
|
||||
val stage = new PushPullGraphStage((_) ⇒ fusing.Map({ x: Any ⇒ x }, stoppingDecider), Attributes.none)
|
||||
val stage = fusing.Map({ x: Any ⇒ x })
|
||||
|
||||
val assembly = new GraphAssembly(
|
||||
Array(stage),
|
||||
|
|
@ -65,10 +65,10 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re
|
|||
Array(null, stage.shape.out),
|
||||
Array(-1, 0))
|
||||
|
||||
val (inHandlers, outHandlers, logics) =
|
||||
val (connections, logics) =
|
||||
assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ())
|
||||
|
||||
val shell = new GraphInterpreterShell(assembly, inHandlers, outHandlers, logics, stage.shape, settings,
|
||||
val shell = new GraphInterpreterShell(assembly, connections, logics, stage.shape, settings,
|
||||
materializer.asInstanceOf[ActorMaterializerImpl])
|
||||
|
||||
val props = Props(new BrokenActorInterpreter(shell, "a3"))
|
||||
|
|
|
|||
|
|
@ -211,9 +211,9 @@ private[akka] case class ActorMaterializerImpl(
|
|||
|
||||
private def matGraph(graph: GraphModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
|
||||
val calculatedSettings = effectiveSettings(effectiveAttributes)
|
||||
val (inHandlers, outHandlers, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc)
|
||||
val (connections, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc)
|
||||
|
||||
val shell = new GraphInterpreterShell(graph.assembly, inHandlers, outHandlers, logics, graph.shape,
|
||||
val shell = new GraphInterpreterShell(graph.assembly, connections, logics, graph.shape,
|
||||
calculatedSettings, ActorMaterializerImpl.this)
|
||||
|
||||
val impl =
|
||||
|
|
|
|||
|
|
@ -156,10 +156,6 @@ object Stages {
|
|||
|
||||
}
|
||||
|
||||
final case class Map[In, Out](f: In ⇒ Out, attributes: Attributes = map) extends SymbolicStage[In, Out] {
|
||||
override def create(attr: Attributes): Stage[In, Out] = fusing.Map(f, supervision(attr))
|
||||
}
|
||||
|
||||
final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy, attributes: Attributes = buffer) extends SymbolicStage[T, T] {
|
||||
require(size > 0, s"Buffer size must be larger than zero but was [$size]")
|
||||
override def create(attr: Attributes): Stage[T, T] = fusing.Buffer(size, overflowStrategy)
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import akka.stream._
|
|||
import akka.stream.impl.ReactiveStreamsCompliance._
|
||||
import akka.stream.impl.StreamLayout.{ AtomicModule, CompositeModule, CopiedModule, Module }
|
||||
import akka.stream.impl.{ SubFusingActorMaterializerImpl, _ }
|
||||
import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, GraphAssembly, UpstreamBoundaryStageLogic }
|
||||
import akka.stream.impl.fusing.GraphInterpreter.{ Connection, DownstreamBoundaryStageLogic, GraphAssembly, UpstreamBoundaryStageLogic }
|
||||
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
|
||||
import org.reactivestreams.{ Subscriber, Subscription }
|
||||
|
||||
|
|
@ -308,8 +308,7 @@ object ActorGraphInterpreter {
|
|||
*/
|
||||
final class GraphInterpreterShell(
|
||||
assembly: GraphAssembly,
|
||||
inHandlers: Array[InHandler],
|
||||
outHandlers: Array[OutHandler],
|
||||
connections: Array[Connection],
|
||||
logics: Array[GraphStageLogic],
|
||||
shape: Shape,
|
||||
settings: ActorMaterializerSettings,
|
||||
|
|
@ -322,7 +321,7 @@ final class GraphInterpreterShell(
|
|||
|
||||
private var enqueueToShortCircuit: (Any) ⇒ Unit = _
|
||||
|
||||
lazy val interpreter: GraphInterpreter = new GraphInterpreter(assembly, mat, log, inHandlers, outHandlers, logics,
|
||||
lazy val interpreter: GraphInterpreter = new GraphInterpreter(assembly, mat, log, logics, connections,
|
||||
(logic, event, handler) ⇒ {
|
||||
val asyncInput = AsyncInput(this, logic, event, handler)
|
||||
val currentInterpreter = GraphInterpreter.currentInterpreterOrNull
|
||||
|
|
@ -366,7 +365,7 @@ final class GraphInterpreterShell(
|
|||
while (i < inputs.length) {
|
||||
val in = new BatchingActorInputBoundary(settings.maxInputBufferSize, i)
|
||||
inputs(i) = in
|
||||
interpreter.attachUpstreamBoundary(i, in)
|
||||
interpreter.attachUpstreamBoundary(connections(i), in)
|
||||
i += 1
|
||||
}
|
||||
val offset = assembly.connectionCount - outputs.length
|
||||
|
|
@ -374,7 +373,7 @@ final class GraphInterpreterShell(
|
|||
while (i < outputs.length) {
|
||||
val out = new ActorOutputBoundary(self, this, i)
|
||||
outputs(i) = out
|
||||
interpreter.attachDownstreamBoundary(i + offset, out)
|
||||
interpreter.attachDownstreamBoundary(connections(i + offset), out)
|
||||
i += 1
|
||||
}
|
||||
interpreter.init(subMat)
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ object GraphInterpreter {
|
|||
*/
|
||||
final val Debug = false
|
||||
|
||||
final val NoEvent = -1
|
||||
final val NoEvent = null
|
||||
final val Boundary = -1
|
||||
|
||||
final val InReady = 1
|
||||
|
|
@ -47,6 +47,8 @@ object GraphInterpreter {
|
|||
final val KeepGoingFlag = 0x4000000
|
||||
final val KeepGoingMask = 0x3ffffff
|
||||
|
||||
final val ChaseLimit = 16
|
||||
|
||||
/**
|
||||
* Marker object that indicates that a port holds no element since it was already grabbed. The port is still pullable,
|
||||
* but there is no more element to grab.
|
||||
|
|
@ -64,6 +66,37 @@ object GraphInterpreter {
|
|||
|
||||
val singleNoAttribute: Array[Attributes] = Array(Attributes.none)
|
||||
|
||||
/**
|
||||
* INERNAL API
|
||||
*
|
||||
* Contains all the necessary information for the GraphInterpreter to be able to implement a connection
|
||||
* between an output and input ports.
|
||||
*
|
||||
* @param id Identifier of the connection. Corresponds to the array slot in the [[GraphAssembly]]
|
||||
* @param inOwnerId Identifier of the owner of the input side of the connection. Corresponds to the array slot in
|
||||
* the [[GraphAssembly]]
|
||||
* @param inOwner The stage logic that corresponds to the input side of the connection.
|
||||
* @param outOwnerId Identifier of the owner of the output side of the connection. Corresponds to the array slot
|
||||
* in the [[GraphAssembly]]
|
||||
* @param outOwner The stage logic that corresponds to the output side of the connection.
|
||||
* @param inHandler The handler that contains the callback for input events.
|
||||
* @param outHandler The handler that contains the callback for output events.
|
||||
*/
|
||||
final class Connection(
|
||||
val id: Int,
|
||||
val inOwnerId: Int,
|
||||
val inOwner: GraphStageLogic,
|
||||
val outOwnerId: Int,
|
||||
val outOwner: GraphStageLogic,
|
||||
var inHandler: InHandler,
|
||||
var outHandler: OutHandler
|
||||
) {
|
||||
var portState: Int = InReady
|
||||
var slot: Any = Empty
|
||||
|
||||
override def toString = s"Connection($id, $portState, $slot, $inHandler, $outHandler)"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
|
|
@ -115,16 +148,14 @@ object GraphInterpreter {
|
|||
* handlers and the stage logic instances.
|
||||
*
|
||||
* Returns a tuple of
|
||||
* - lookup table for InHandlers
|
||||
* - lookup table for OutHandlers
|
||||
* - lookup table for Connections
|
||||
* - array of the logics
|
||||
* - materialized value
|
||||
*/
|
||||
def materialize(
|
||||
inheritedAttributes: Attributes,
|
||||
copiedModules: Array[Module],
|
||||
matVal: ju.Map[Module, Any],
|
||||
register: MaterializedValueSource[Any] ⇒ Unit): (Array[InHandler], Array[OutHandler], Array[GraphStageLogic]) = {
|
||||
register: MaterializedValueSource[Any] ⇒ Unit): (Array[Connection], Array[GraphStageLogic]) = {
|
||||
val logics = Array.ofDim[GraphStageLogic](stages.length)
|
||||
|
||||
var i = 0
|
||||
|
|
@ -165,32 +196,43 @@ object GraphInterpreter {
|
|||
i += 1
|
||||
}
|
||||
|
||||
val inHandlers = Array.ofDim[InHandler](connectionCount)
|
||||
val outHandlers = Array.ofDim[OutHandler](connectionCount)
|
||||
val connections = Array.ofDim[Connection](connectionCount)
|
||||
|
||||
i = 0
|
||||
while (i < connectionCount) {
|
||||
connections(i) = new Connection(
|
||||
id = i,
|
||||
inOwner = if (inOwners(i) == Boundary) null else logics(inOwners(i)),
|
||||
inOwnerId = inOwners(i),
|
||||
outOwner = if (outOwners(i) == Boundary) null else logics(outOwners(i)),
|
||||
outOwnerId = outOwners(i),
|
||||
inHandler = null,
|
||||
outHandler = null
|
||||
)
|
||||
|
||||
if (ins(i) ne null) {
|
||||
val logic = logics(inOwners(i))
|
||||
logic.handlers(ins(i).id) match {
|
||||
case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${ins(i)}")
|
||||
case h: InHandler ⇒ inHandlers(i) = h
|
||||
case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${ins(i)}")
|
||||
case h: InHandler ⇒
|
||||
connections(i).inHandler = h
|
||||
}
|
||||
logics(inOwners(i)).portToConn(ins(i).id) = i
|
||||
logics(inOwners(i)).portToConn(ins(i).id) = connections(i)
|
||||
}
|
||||
if (outs(i) ne null) {
|
||||
val logic = logics(outOwners(i))
|
||||
val inCount = logic.inCount
|
||||
logic.handlers(outs(i).id + inCount) match {
|
||||
case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${outs(i)}")
|
||||
case h: OutHandler ⇒ outHandlers(i) = h
|
||||
case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${outs(i)}")
|
||||
case h: OutHandler ⇒
|
||||
connections(i).outHandler = h
|
||||
}
|
||||
logic.portToConn(outs(i).id + inCount) = i
|
||||
logic.portToConn(outs(i).id + inCount) = connections(i)
|
||||
}
|
||||
i += 1
|
||||
}
|
||||
|
||||
(inHandlers, outHandlers, logics)
|
||||
(connections, logics)
|
||||
}
|
||||
|
||||
override def toString: String = {
|
||||
|
|
@ -291,25 +333,27 @@ object GraphInterpreter {
|
|||
* The internal architecture of the interpreter is based on the usage of arrays and optimized for reducing allocations
|
||||
* on the hot paths.
|
||||
*
|
||||
* One of the basic abstractions inside the interpreter is the notion of *connection*. In the abstract sense a
|
||||
* connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair),
|
||||
* while in the practical sense a connection is a number which represents slots in certain arrays.
|
||||
* One of the basic abstractions inside the interpreter is the [[akka.stream.impl.fusing.GraphInterpreter.Connection]].
|
||||
* A connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair).
|
||||
* The Connection object contains all the necessary data for the interpreter to pass elements, demand, completion
|
||||
* or errors across the Connection.
|
||||
* In particular
|
||||
* - portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this
|
||||
* connection. This bitfield is used to decode the event that is in-flight.
|
||||
* - connectionSlots is a mapping from a connection id to a potential element or exception that accompanies the
|
||||
* - connectionSlot contains a potential element or exception that accompanies the
|
||||
* event encoded in the portStates bitfield
|
||||
* - inHandlers is a mapping from a connection id to the [[InHandler]] instance that handles the events corresponding
|
||||
* - inHandler contains the [[InHandler]] instance that handles the events corresponding
|
||||
* to the input port of the connection
|
||||
* - outHandlers is a mapping from a connection id to the [[OutHandler]] instance that handles the events corresponding
|
||||
* - outHandler contains the [[OutHandler]] instance that handles the events corresponding
|
||||
* to the output port of the connection
|
||||
*
|
||||
* On top of these lookup tables there is an eventQueue, represented as a circular buffer of integers. The integers
|
||||
* it contains represents connections that have pending events to be processed. The pending event itself is encoded
|
||||
* in the portStates bitfield. This implies that there can be only one event in flight for a given connection, which
|
||||
* is true in almost all cases, except a complete-after-push or fail-after-push.
|
||||
* On top of the Connection table there is an eventQueue, represented as a circular buffer of Connections. The queue
|
||||
* contains the Connections that have pending events to be processed. The pending event itself is encoded
|
||||
* in the portState bitfield of the Connection. This implies that there can be only one event in flight for a given
|
||||
* Connection, which is true in almost all cases, except a complete-after-push or fail-after-push which has to
|
||||
* be decoded accordingly.
|
||||
*
|
||||
* The layout of the portStates bitfield is the following:
|
||||
* The layout of the portState bitfield is the following:
|
||||
*
|
||||
* |- state machn.-| Only one bit is hot among these bits
|
||||
* 64 32 16 | 8 4 2 1 |
|
||||
|
|
@ -333,10 +377,10 @@ object GraphInterpreter {
|
|||
* Sending an event is usually the following sequence:
|
||||
* - An action is requested by a stage logic (push, pull, complete, etc.)
|
||||
* - the state machine in portStates is transitioned from a ready state to a pending event
|
||||
* - the id of the affected connection is enqueued
|
||||
* - the affected Connection is enqueued
|
||||
*
|
||||
* Receiving an event is usually the following sequence:
|
||||
* - id of connection to be processed is dequeued
|
||||
* - the Connection to be processed is dequeued
|
||||
* - the type of the event is determined from the bits set on portStates
|
||||
* - the state machine in portStates is transitioned to a ready state
|
||||
* - using the inHandlers/outHandlers table the corresponding callback is called on the stage logic.
|
||||
|
|
@ -350,9 +394,8 @@ final class GraphInterpreter(
|
|||
private val assembly: GraphInterpreter.GraphAssembly,
|
||||
val materializer: Materializer,
|
||||
val log: LoggingAdapter,
|
||||
val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection
|
||||
val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection
|
||||
val logics: Array[GraphStageLogic], // Array of stage logics
|
||||
val connections: Array[GraphInterpreter.Connection],
|
||||
val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit,
|
||||
val fuzzingMode: Boolean,
|
||||
val context: ActorRef) {
|
||||
|
|
@ -360,11 +403,11 @@ final class GraphInterpreter(
|
|||
|
||||
// Maintains additional information for events, basically elements in-flight, or failure.
|
||||
// Other events are encoded in the portStates bitfield.
|
||||
val connectionSlots = Array.fill[Any](assembly.connectionCount)(Empty)
|
||||
//val connectionSlots = Array.fill[Any](assembly.connectionCount)(Empty)
|
||||
|
||||
// Bitfield encoding pending events and various states for efficient querying and updates. See the documentation
|
||||
// of the class for a full description.
|
||||
val portStates = Array.fill[Int](assembly.connectionCount)(InReady)
|
||||
//val portStates = Array.fill[Int](assembly.connectionCount)(InReady)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -381,20 +424,24 @@ final class GraphInterpreter(
|
|||
shape.inlets.size + shape.outlets.size
|
||||
}
|
||||
|
||||
private var _subFusingMaterializer: Materializer = _
|
||||
private[this] var _subFusingMaterializer: Materializer = _
|
||||
def subFusingMaterializer: Materializer = _subFusingMaterializer
|
||||
|
||||
// An event queue implemented as a circular buffer
|
||||
// FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue
|
||||
private[this] val eventQueue = Array.ofDim[Int](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1)))
|
||||
private[this] val eventQueue = Array.ofDim[Connection](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1)))
|
||||
private[this] val mask = eventQueue.length - 1
|
||||
private[this] var queueHead: Int = 0
|
||||
private[this] var queueTail: Int = 0
|
||||
|
||||
private[this] var chaseCounter = 0 // the first events in preStart blocks should be not chased
|
||||
private[this] var chasedPush: Connection = NoEvent
|
||||
private[this] var chasedPull: Connection = NoEvent
|
||||
|
||||
private def queueStatus: String = {
|
||||
val contents = (queueHead until queueTail).map(idx ⇒ {
|
||||
val conn = eventQueue(idx & mask)
|
||||
(conn, portStates(conn), connectionSlots(conn))
|
||||
conn
|
||||
})
|
||||
s"(${eventQueue.length}, $queueHead, $queueTail)(${contents.mkString(", ")})"
|
||||
}
|
||||
|
|
@ -414,36 +461,42 @@ final class GraphInterpreter(
|
|||
* Assign the boundary logic to a given connection. This will serve as the interface to the external world
|
||||
* (outside the interpreter) to process and inject events.
|
||||
*/
|
||||
def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit = {
|
||||
def attachUpstreamBoundary(connection: Connection, logic: UpstreamBoundaryStageLogic[_]): Unit = {
|
||||
logic.portToConn(logic.out.id + logic.inCount) = connection
|
||||
logic.interpreter = this
|
||||
outHandlers(connection) = logic.handlers(0).asInstanceOf[OutHandler]
|
||||
connection.outHandler = logic.handlers(0).asInstanceOf[OutHandler]
|
||||
}
|
||||
|
||||
def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit =
|
||||
attachUpstreamBoundary(connections(connection), logic)
|
||||
|
||||
/**
|
||||
* Assign the boundary logic to a given connection. This will serve as the interface to the external world
|
||||
* (outside the interpreter) to process and inject events.
|
||||
*/
|
||||
def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit = {
|
||||
def attachDownstreamBoundary(connection: Connection, logic: DownstreamBoundaryStageLogic[_]): Unit = {
|
||||
logic.portToConn(logic.in.id) = connection
|
||||
logic.interpreter = this
|
||||
inHandlers(connection) = logic.handlers(0).asInstanceOf[InHandler]
|
||||
connection.inHandler = logic.handlers(0).asInstanceOf[InHandler]
|
||||
}
|
||||
|
||||
def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit =
|
||||
attachDownstreamBoundary(connections(connection), logic)
|
||||
|
||||
/**
|
||||
* Dynamic handler changes are communicated from a GraphStageLogic by this method.
|
||||
*/
|
||||
def setHandler(connection: Int, handler: InHandler): Unit = {
|
||||
def setHandler(connection: Connection, handler: InHandler): Unit = {
|
||||
if (Debug) println(s"$Name SETHANDLER ${inOwnerName(connection)} (in) $handler")
|
||||
inHandlers(connection) = handler
|
||||
connection.inHandler = handler
|
||||
}
|
||||
|
||||
/**
|
||||
* Dynamic handler changes are communicated from a GraphStageLogic by this method.
|
||||
*/
|
||||
def setHandler(connection: Int, handler: OutHandler): Unit = {
|
||||
def setHandler(connection: Connection, handler: OutHandler): Unit = {
|
||||
if (Debug) println(s"$Name SETHANDLER ${outOwnerName(connection)} (out) $handler")
|
||||
outHandlers(connection) = handler
|
||||
connection.outHandler = handler
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -496,29 +549,29 @@ final class GraphInterpreter(
|
|||
}
|
||||
|
||||
// Debug name for a connections input part
|
||||
private def inOwnerName(connection: Int): String =
|
||||
assembly.inOwners(connection) match {
|
||||
private def inOwnerName(connection: Connection): String =
|
||||
assembly.inOwners(connection.id) match {
|
||||
case Boundary ⇒ "DownstreamBoundary"
|
||||
case owner ⇒ assembly.stages(owner).toString
|
||||
}
|
||||
|
||||
// Debug name for a connections output part
|
||||
private def outOwnerName(connection: Int): String =
|
||||
assembly.outOwners(connection) match {
|
||||
private def outOwnerName(connection: Connection): String =
|
||||
assembly.outOwners(connection.id) match {
|
||||
case Boundary ⇒ "UpstreamBoundary"
|
||||
case owner ⇒ assembly.stages(owner).toString
|
||||
}
|
||||
|
||||
// Debug name for a connections input part
|
||||
private def inLogicName(connection: Int): String =
|
||||
assembly.inOwners(connection) match {
|
||||
private def inLogicName(connection: Connection): String =
|
||||
assembly.inOwners(connection.id) match {
|
||||
case Boundary ⇒ "DownstreamBoundary"
|
||||
case owner ⇒ logics(owner).toString
|
||||
}
|
||||
|
||||
// Debug name for a connections output part
|
||||
private def outLogicName(connection: Int): String =
|
||||
assembly.outOwners(connection) match {
|
||||
private def outLogicName(connection: Connection): String =
|
||||
assembly.outOwners(connection.id) match {
|
||||
case Boundary ⇒ "UpstreamBoundary"
|
||||
case owner ⇒ logics(owner).toString
|
||||
}
|
||||
|
|
@ -539,20 +592,94 @@ final class GraphInterpreter(
|
|||
try {
|
||||
while (eventsRemaining > 0 && queueTail != queueHead) {
|
||||
val connection = dequeue()
|
||||
eventsRemaining -= 1
|
||||
chaseCounter = math.min(ChaseLimit, eventsRemaining)
|
||||
|
||||
def reportStageError(e: Throwable): Unit = {
|
||||
if (activeStage == null) throw e
|
||||
else {
|
||||
val stage = assembly.stages(activeStage.stageId)
|
||||
|
||||
log.error(e, "Error in stage [{}]: {}", stage, e.getMessage)
|
||||
activeStage.failStage(e)
|
||||
|
||||
// Abort chasing
|
||||
chaseCounter = 0
|
||||
if (chasedPush ne NoEvent) {
|
||||
enqueue(chasedPush)
|
||||
chasedPush = NoEvent
|
||||
}
|
||||
if (chasedPull ne NoEvent) {
|
||||
enqueue(chasedPull)
|
||||
chasedPull = NoEvent
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This is the "normal" event processing code which dequeues directly from the internal event queue. Since
|
||||
* most execution paths tend to produce either a Push that will be propagated along a longer chain we take
|
||||
* extra steps below to make this more efficient.
|
||||
*/
|
||||
try processEvent(connection)
|
||||
catch {
|
||||
case NonFatal(e) ⇒
|
||||
if (activeStage == null) throw e
|
||||
else {
|
||||
val stage = assembly.stages(activeStage.stageId)
|
||||
|
||||
log.error(e, "Error in stage [{}]: {}", stage, e.getMessage)
|
||||
activeStage.failStage(e)
|
||||
}
|
||||
case NonFatal(e) ⇒ reportStageError(e)
|
||||
}
|
||||
afterStageHasRun(activeStage)
|
||||
eventsRemaining -= 1
|
||||
|
||||
/*
|
||||
* "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or
|
||||
* Pull is very likely immediately followed by another Push/Pull. The difference from the "normal" event
|
||||
* dispatch is that chased events are never touching the event queue, they use a "streamlined" execution path
|
||||
* instead. Looking at the scenario of a Push, the following events will happen.
|
||||
* - "normal" dispatch executes an onPush event
|
||||
* - stage eventually calls push()
|
||||
* - code inside the push() method checks the validity of the call, and also if it can be safely ignored
|
||||
* (because the target stage already completed we just have not been notified yet)
|
||||
* - if the upper limit of ChaseLimit has not been reached, then the Connection is put into the chasedPush
|
||||
* variable
|
||||
* - the loop below immediately captures this push and dispatches it
|
||||
*
|
||||
* What is saved by this optimization is three steps:
|
||||
* - no need to enqueue the Connection in the queue (array), it ends up in a simple variable, reducing
|
||||
* pressure on array load-store
|
||||
* - no need to dequeue the Connection from the queue, similar to above
|
||||
* - no need to decode the event, we know it is a Push already
|
||||
* - no need to check for validity of the event because we already checked at the push() call, and there
|
||||
* can be no concurrent events interleaved unlike with the normal dispatch (think about a cancel() that is
|
||||
* called in the target stage just before the onPush() arrives). This avoids unnecessary branching.
|
||||
*/
|
||||
|
||||
// Chasing PUSH events
|
||||
while (chasedPush != NoEvent) {
|
||||
val connection = chasedPush
|
||||
chasedPush = NoEvent
|
||||
try processPush(connection)
|
||||
catch {
|
||||
case NonFatal(e) ⇒ reportStageError(e)
|
||||
}
|
||||
afterStageHasRun(activeStage)
|
||||
}
|
||||
|
||||
// Chasing PULL events
|
||||
while (chasedPull != NoEvent) {
|
||||
val connection = chasedPull
|
||||
chasedPull = NoEvent
|
||||
try processPull(connection)
|
||||
catch {
|
||||
case NonFatal(e) ⇒ reportStageError(e)
|
||||
}
|
||||
afterStageHasRun(activeStage)
|
||||
}
|
||||
|
||||
if (chasedPush != NoEvent) {
|
||||
enqueue(chasedPush)
|
||||
chasedPush = NoEvent
|
||||
}
|
||||
|
||||
}
|
||||
// Event *must* be enqueued while not in the execute loop (events enqueued from external, possibly async events)
|
||||
chaseCounter = 0
|
||||
} finally {
|
||||
currentInterpreterHolder(0) = previousInterpreter
|
||||
}
|
||||
|
|
@ -578,65 +705,64 @@ final class GraphInterpreter(
|
|||
}
|
||||
|
||||
// Decodes and processes a single event for the given connection
|
||||
private def processEvent(connection: Int): Unit = {
|
||||
def safeLogics(id: Int) =
|
||||
if (id == Boundary) null
|
||||
else logics(id)
|
||||
|
||||
def processElement(): Unit = {
|
||||
if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connectionSlots(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]")
|
||||
activeStage = safeLogics(assembly.inOwners(connection))
|
||||
portStates(connection) ^= PushEndFlip
|
||||
inHandlers(connection).onPush()
|
||||
}
|
||||
private def processEvent(connection: Connection): Unit = {
|
||||
|
||||
// this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage
|
||||
// (this can happen if a stage completes voluntarily while connection close events are still queued)
|
||||
activeStage = null
|
||||
val code = portStates(connection)
|
||||
val code = connection.portState
|
||||
|
||||
// Manual fast decoding, fast paths are PUSH and PULL
|
||||
// PUSH
|
||||
if ((code & (Pushing | InClosed | OutClosed)) == Pushing) {
|
||||
processElement()
|
||||
processPush(connection)
|
||||
|
||||
// PULL
|
||||
} else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) {
|
||||
if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]")
|
||||
portStates(connection) ^= PullEndFlip
|
||||
activeStage = safeLogics(assembly.outOwners(connection))
|
||||
outHandlers(connection).onPull()
|
||||
processPull(connection)
|
||||
|
||||
// CANCEL
|
||||
} else if ((code & (OutClosed | InClosed)) == InClosed) {
|
||||
val stageId = assembly.outOwners(connection)
|
||||
activeStage = safeLogics(stageId)
|
||||
if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]")
|
||||
portStates(connection) |= OutClosed
|
||||
completeConnection(stageId)
|
||||
outHandlers(connection).onDownstreamFinish()
|
||||
activeStage = connection.outOwner
|
||||
if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]")
|
||||
connection.portState |= OutClosed
|
||||
completeConnection(connection.outOwnerId)
|
||||
connection.outHandler.onDownstreamFinish()
|
||||
} else if ((code & (OutClosed | InClosed)) == OutClosed) {
|
||||
// COMPLETIONS
|
||||
|
||||
if ((code & Pushing) == 0) {
|
||||
// Normal completion (no push pending)
|
||||
if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]")
|
||||
portStates(connection) |= InClosed
|
||||
val stageId = assembly.inOwners(connection)
|
||||
activeStage = safeLogics(stageId)
|
||||
completeConnection(stageId)
|
||||
if ((portStates(connection) & InFailed) == 0) inHandlers(connection).onUpstreamFinish()
|
||||
else inHandlers(connection).onUpstreamFailure(connectionSlots(connection).asInstanceOf[Failed].ex)
|
||||
if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]")
|
||||
connection.portState |= InClosed
|
||||
activeStage = connection.inOwner
|
||||
completeConnection(connection.inOwnerId)
|
||||
if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish()
|
||||
else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex)
|
||||
} else {
|
||||
// Push is pending, first process push, then re-enqueue closing event
|
||||
processElement()
|
||||
processPush(connection)
|
||||
enqueue(connection)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private def dequeue(): Int = {
|
||||
private def processPush(connection: Connection): Unit = {
|
||||
if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connection.slot} (${connection.inHandler}) [${inLogicName(connection)}]")
|
||||
activeStage = connection.inOwner
|
||||
connection.portState ^= PushEndFlip
|
||||
connection.inHandler.onPush()
|
||||
}
|
||||
|
||||
private def processPull(connection: Connection): Unit = {
|
||||
if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]")
|
||||
activeStage = connection.outOwner
|
||||
connection.portState ^= PullEndFlip
|
||||
connection.outHandler.onPull()
|
||||
}
|
||||
|
||||
private def dequeue(): Connection = {
|
||||
val idx = queueHead & mask
|
||||
if (fuzzingMode) {
|
||||
val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask
|
||||
|
|
@ -650,7 +776,7 @@ final class GraphInterpreter(
|
|||
elem
|
||||
}
|
||||
|
||||
private def enqueue(connection: Int): Unit = {
|
||||
def enqueue(connection: Connection): Unit = {
|
||||
if (Debug) if (queueTail - queueHead > mask) new Exception(s"$Name internal queue full ($queueStatus) + $connection").printStackTrace()
|
||||
eventQueue(queueTail & mask) = connection
|
||||
queueTail += 1
|
||||
|
|
@ -688,52 +814,55 @@ final class GraphInterpreter(
|
|||
}
|
||||
}
|
||||
|
||||
private[stream] def push(connection: Int, elem: Any): Unit = {
|
||||
val currentState = portStates(connection)
|
||||
portStates(connection) = currentState ^ PushStartFlip
|
||||
if ((currentState & InClosed) == 0) {
|
||||
connectionSlots(connection) = elem
|
||||
enqueue(connection)
|
||||
}
|
||||
private[stream] def chasePush(connection: Connection): Unit = {
|
||||
if (chaseCounter > 0 && chasedPush == NoEvent) {
|
||||
chaseCounter -= 1
|
||||
chasedPush = connection
|
||||
} else enqueue(connection)
|
||||
}
|
||||
|
||||
private[stream] def pull(connection: Int): Unit = {
|
||||
val currentState = portStates(connection)
|
||||
portStates(connection) = currentState ^ PullStartFlip
|
||||
if ((currentState & OutClosed) == 0) {
|
||||
enqueue(connection)
|
||||
}
|
||||
private[stream] def chasePull(connection: Connection): Unit = {
|
||||
if (chaseCounter > 0 && chasedPull == NoEvent) {
|
||||
chaseCounter -= 1
|
||||
chasedPull = connection
|
||||
} else enqueue(connection)
|
||||
}
|
||||
|
||||
private[stream] def complete(connection: Int): Unit = {
|
||||
val currentState = portStates(connection)
|
||||
private[stream] def complete(connection: Connection): Unit = {
|
||||
val currentState = connection.portState
|
||||
if (Debug) println(s"$Name complete($connection) [$currentState]")
|
||||
portStates(connection) = currentState | OutClosed
|
||||
if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection)
|
||||
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
|
||||
connection.portState = currentState | OutClosed
|
||||
|
||||
// Push-Close needs special treatment, cannot be chased, convert back to ordinary event
|
||||
if (chasedPush == connection) {
|
||||
chasedPush = NoEvent
|
||||
enqueue(connection)
|
||||
} else if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection)
|
||||
|
||||
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId)
|
||||
}
|
||||
|
||||
private[stream] def fail(connection: Int, ex: Throwable): Unit = {
|
||||
val currentState = portStates(connection)
|
||||
private[stream] def fail(connection: Connection, ex: Throwable): Unit = {
|
||||
val currentState = connection.portState
|
||||
if (Debug) println(s"$Name fail($connection, $ex) [$currentState]")
|
||||
portStates(connection) = currentState | OutClosed
|
||||
connection.portState = currentState | OutClosed
|
||||
if ((currentState & (InClosed | OutClosed)) == 0) {
|
||||
portStates(connection) = currentState | (OutClosed | InFailed)
|
||||
connectionSlots(connection) = Failed(ex, connectionSlots(connection))
|
||||
connection.portState = currentState | (OutClosed | InFailed)
|
||||
connection.slot = Failed(ex, connection.slot)
|
||||
if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection)
|
||||
}
|
||||
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
|
||||
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId)
|
||||
}
|
||||
|
||||
private[stream] def cancel(connection: Int): Unit = {
|
||||
val currentState = portStates(connection)
|
||||
private[stream] def cancel(connection: Connection): Unit = {
|
||||
val currentState = connection.portState
|
||||
if (Debug) println(s"$Name cancel($connection) [$currentState]")
|
||||
portStates(connection) = currentState | InClosed
|
||||
connection.portState = currentState | InClosed
|
||||
if ((currentState & OutClosed) == 0) {
|
||||
connectionSlots(connection) = Empty
|
||||
connection.slot = Empty
|
||||
if ((currentState & (Pulling | Pushing | InClosed)) == 0) enqueue(connection)
|
||||
}
|
||||
if ((currentState & InClosed) == 0) completeConnection(assembly.inOwners(connection))
|
||||
if ((currentState & InClosed) == 0) completeConnection(connection.inOwnerId)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -762,8 +891,8 @@ final class GraphInterpreter(
|
|||
else "N" + owner
|
||||
}
|
||||
|
||||
for (i ← portStates.indices) {
|
||||
portStates(i) match {
|
||||
for (i ← connections.indices) {
|
||||
connections(i).portState match {
|
||||
case InReady ⇒
|
||||
builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [label=shouldPull; color=blue]""")
|
||||
case OutReady ⇒
|
||||
|
|
|
|||
|
|
@ -137,20 +137,19 @@ private[akka] class IteratorInterpreter[I, O](
|
|||
}
|
||||
val assembly = new GraphAssembly(stagesArray, attributes, ins, inOwners, outs, outOwners)
|
||||
|
||||
val (inHandlers, outHandlers, logics) =
|
||||
val (connections, logics) =
|
||||
assembly.materialize(Attributes.none, assembly.stages.map(_.module), new ju.HashMap, _ ⇒ ())
|
||||
val interpreter = new GraphInterpreter(
|
||||
assembly,
|
||||
NoMaterializer,
|
||||
NoLogging,
|
||||
inHandlers,
|
||||
outHandlers,
|
||||
logics,
|
||||
connections,
|
||||
(_, _, _) ⇒ throw new UnsupportedOperationException("IteratorInterpreter does not support asynchronous events."),
|
||||
fuzzingMode = false,
|
||||
null)
|
||||
interpreter.attachUpstreamBoundary(0, upstream)
|
||||
interpreter.attachDownstreamBoundary(length, downstream)
|
||||
interpreter.attachUpstreamBoundary(connections(0), upstream)
|
||||
interpreter.attachDownstreamBoundary(connections(length), downstream)
|
||||
interpreter.init(null)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,10 +25,33 @@ import akka.stream.impl.Stages.DefaultAttributes
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
final case class Map[In, Out](f: In ⇒ Out, decider: Supervision.Decider) extends PushStage[In, Out] {
|
||||
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = ctx.push(f(elem))
|
||||
// FIXME: Not final because InterpreterSupervisionSpec. Some better option is needed here
|
||||
case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Out]] {
|
||||
val in = Inlet[In]("Map.in")
|
||||
val out = Outlet[Out]("Map.out")
|
||||
override val shape = FlowShape(in, out)
|
||||
override def initialAttributes: Attributes = DefaultAttributes.map
|
||||
|
||||
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
private def decider =
|
||||
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||
|
||||
override def onPush(): Unit = {
|
||||
try {
|
||||
push(out, f(grab(in)))
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒ decider(ex) match {
|
||||
case Supervision.Stop ⇒ failStage(ex)
|
||||
case _ ⇒ pull(in)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -356,19 +379,24 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta
|
|||
override val initialAttributes = DefaultAttributes.fold
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
private var aggregator: Out = zero
|
||||
|
||||
override def onResume(t: Throwable): Unit = {
|
||||
aggregator = zero
|
||||
}
|
||||
private def decider =
|
||||
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||
|
||||
override def onPush(): Unit = withSupervision(() ⇒ grab(in)) match {
|
||||
case Some(elem) ⇒ {
|
||||
aggregator = f(aggregator, elem)
|
||||
override def onPush(): Unit = {
|
||||
try {
|
||||
aggregator = f(aggregator, grab(in))
|
||||
pull(in)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒ decider(ex) match {
|
||||
case Supervision.Stop ⇒ failStage(ex)
|
||||
case _ ⇒
|
||||
aggregator = zero
|
||||
pull(in)
|
||||
}
|
||||
}
|
||||
case None ⇒ pull(in)
|
||||
}
|
||||
|
||||
override def onPull(): Unit = {
|
||||
|
|
|
|||
|
|
@ -475,7 +475,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
*/
|
||||
def map[T](f: Out ⇒ T): Repr[T] = andThen(Map(f))
|
||||
def map[T](f: Out ⇒ T): Repr[T] = via(Map(f))
|
||||
|
||||
/**
|
||||
* Transform each input element into an `Iterable` of output elements that is
|
||||
|
|
|
|||
|
|
@ -220,7 +220,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
* INTERNAL API
|
||||
*/
|
||||
// Using common array to reduce overhead for small port counts
|
||||
private[stream] val portToConn = Array.ofDim[Int](handlers.length)
|
||||
private[stream] val portToConn = Array.ofDim[Connection](handlers.length)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -318,8 +318,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
if (_interpreter != null) _interpreter.setHandler(conn(out), handler)
|
||||
}
|
||||
|
||||
private def conn(in: Inlet[_]): Int = portToConn(in.id)
|
||||
private def conn(out: Outlet[_]): Int = portToConn(out.id + inCount)
|
||||
private def conn(in: Inlet[_]): Connection = portToConn(in.id)
|
||||
private def conn(out: Outlet[_]): Connection = portToConn(out.id + inCount)
|
||||
|
||||
/**
|
||||
* Retrieves the current callback for the events on the given [[Outlet]]
|
||||
|
|
@ -340,12 +340,21 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
* query whether pull is allowed to be called or not. This method will also fail if the port is already closed.
|
||||
*/
|
||||
final protected def pull[T](in: Inlet[T]): Unit = {
|
||||
if ((interpreter.portStates(conn(in)) & (InReady | InClosed)) == InReady) {
|
||||
interpreter.pull(conn(in))
|
||||
val connection = conn(in)
|
||||
val portState = connection.portState
|
||||
val it = interpreter
|
||||
|
||||
if ((portState & (InReady | InClosed | OutClosed)) == InReady) {
|
||||
connection.portState = portState ^ PullStartFlip
|
||||
it.chasePull(connection)
|
||||
} else {
|
||||
// Detailed error information should not add overhead to the hot path
|
||||
require(!isClosed(in), s"Cannot pull closed port ($in)")
|
||||
require(!hasBeenPulled(in), s"Cannot pull port ($in) twice")
|
||||
|
||||
// There were no errors, the pull was simply ignored as the target stage already closed its port. We
|
||||
// still need to track proper state though.
|
||||
connection.portState = portState ^ PullStartFlip
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -371,18 +380,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
*/
|
||||
final protected def grab[T](in: Inlet[T]): T = {
|
||||
val connection = conn(in)
|
||||
val it = interpreter
|
||||
val elem = connection.slot
|
||||
|
||||
// Fast path
|
||||
if ((interpreter.portStates(connection) & (InReady | InFailed)) == InReady &&
|
||||
(interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty)) {
|
||||
val elem = interpreter.connectionSlots(connection)
|
||||
interpreter.connectionSlots(connection) = Empty
|
||||
if ((connection.portState & (InReady | InFailed)) == InReady && (elem.asInstanceOf[AnyRef] ne Empty)) {
|
||||
connection.slot = Empty
|
||||
elem.asInstanceOf[T]
|
||||
} else {
|
||||
// Slow path
|
||||
require(isAvailable(in), s"Cannot get element from already empty input port ($in)")
|
||||
val failed = interpreter.connectionSlots(connection).asInstanceOf[Failed]
|
||||
val failed = connection.slot.asInstanceOf[Failed]
|
||||
val elem = failed.previousElem.asInstanceOf[T]
|
||||
interpreter.connectionSlots(connection) = Failed(failed.ex, Empty)
|
||||
connection.slot = Failed(failed.ex, Empty)
|
||||
elem
|
||||
}
|
||||
}
|
||||
|
|
@ -391,7 +401,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
* Indicates whether there is already a pending pull for the given input port. If this method returns true
|
||||
* then [[isAvailable()]] must return false for that same port.
|
||||
*/
|
||||
final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = (interpreter.portStates(conn(in)) & (InReady | InClosed)) == 0
|
||||
final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = (conn(in).portState & (InReady | InClosed)) == 0
|
||||
|
||||
/**
|
||||
* Indicates whether there is an element waiting at the given input port. [[grab()]] can be used to retrieve the
|
||||
|
|
@ -402,14 +412,14 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
final protected def isAvailable[T](in: Inlet[T]): Boolean = {
|
||||
val connection = conn(in)
|
||||
|
||||
val normalArrived = (interpreter.portStates(conn(in)) & (InReady | InFailed)) == InReady
|
||||
val normalArrived = (conn(in).portState & (InReady | InFailed)) == InReady
|
||||
|
||||
// Fast path
|
||||
if (normalArrived) interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty
|
||||
if (normalArrived) connection.slot.asInstanceOf[AnyRef] ne Empty
|
||||
else {
|
||||
// Slow path on failure
|
||||
if ((interpreter.portStates(conn(in)) & (InReady | InFailed)) == (InReady | InFailed)) {
|
||||
interpreter.connectionSlots(connection) match {
|
||||
if ((connection.portState & (InReady | InFailed)) == (InReady | InFailed)) {
|
||||
connection.slot match {
|
||||
case Failed(_, elem) ⇒ elem.asInstanceOf[AnyRef] ne Empty
|
||||
case _ ⇒ false // This can only be Empty actually (if a cancel was concurrent with a failure)
|
||||
}
|
||||
|
|
@ -420,7 +430,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
/**
|
||||
* Indicates whether the port has been closed. A closed port cannot be pulled.
|
||||
*/
|
||||
final protected def isClosed[T](in: Inlet[T]): Boolean = (interpreter.portStates(conn(in)) & InClosed) != 0
|
||||
final protected def isClosed[T](in: Inlet[T]): Boolean = (conn(in).portState & InClosed) != 0
|
||||
|
||||
/**
|
||||
* Emits an element through the given output port. Calling this method twice before a [[pull()]] has been arrived
|
||||
|
|
@ -428,13 +438,26 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
* used to check if the port is ready to be pushed or not.
|
||||
*/
|
||||
final protected def push[T](out: Outlet[T], elem: T): Unit = {
|
||||
if ((interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady && (elem != null)) {
|
||||
interpreter.push(conn(out), elem)
|
||||
val connection = conn(out)
|
||||
val portState = connection.portState
|
||||
val it = interpreter
|
||||
|
||||
connection.portState = portState ^ PushStartFlip
|
||||
|
||||
if ((portState & (OutReady | OutClosed | InClosed)) == OutReady && (elem != null)) {
|
||||
connection.slot = elem
|
||||
it.chasePush(connection)
|
||||
} else {
|
||||
// Restore state for the error case
|
||||
connection.portState = portState
|
||||
|
||||
// Detailed error information should not add overhead to the hot path
|
||||
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
||||
require(isAvailable(out), s"Cannot push port ($out) twice")
|
||||
require(!isClosed(out), s"Cannot pull closed port ($out)")
|
||||
|
||||
// No error, just InClosed caused the actual pull to be ignored, but the status flag still needs to be flipped
|
||||
connection.portState = portState ^ PushStartFlip
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -500,12 +523,12 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
* Return true if the given output port is ready to be pushed.
|
||||
*/
|
||||
final def isAvailable[T](out: Outlet[T]): Boolean =
|
||||
(interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady
|
||||
(conn(out).portState & (OutReady | OutClosed)) == OutReady
|
||||
|
||||
/**
|
||||
* Indicates whether the port has been closed. A closed port cannot be pushed.
|
||||
*/
|
||||
final protected def isClosed[T](out: Outlet[T]): Boolean = (interpreter.portStates(conn(out)) & OutClosed) != 0
|
||||
final protected def isClosed[T](out: Outlet[T]): Boolean = (conn(out).portState & OutClosed) != 0
|
||||
|
||||
/**
|
||||
* Read a number of elements from the given inlet and continue with the given function,
|
||||
|
|
|
|||
|
|
@ -903,7 +903,10 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.pubsub.protobuf.msg.DistributedPubSubMessages#StatusOrBuilder.hasReplyToStatus"),
|
||||
|
||||
// #20543 GraphStage subtypes should not be private to akka
|
||||
ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.ActorMaterializer.actorOf")
|
||||
ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.ActorMaterializer.actorOf"),
|
||||
|
||||
// Interpreter internals change
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.stage.GraphStageLogic.portToConn")
|
||||
),
|
||||
"2.4.9" -> Seq(
|
||||
// #20994 adding new decode method, since we're on JDK7+ now
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue