diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala new file mode 100644 index 0000000000..71e5984464 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import java.util.concurrent.atomic.AtomicInteger + +import akka.stream.impl.{ Ast, ActorBasedFlowMaterializer } +import akka.stream.{ FlowMaterializer, MaterializerSettings } +import org.reactivestreams.{ Publisher, Processor } + +class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { + + val processorCounter = new AtomicInteger + + override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize) + + implicit val materializer = FlowMaterializer(settings)(system) + + val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet() + + val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode( + Ast.Fusable(Vector(akka.stream.impl.fusing.Map[Int, Int](identity)), "identity"), flowName, 1) + + processor.asInstanceOf[Processor[Int, Int]] + } + + override def createHelperPublisher(elements: Long): Publisher[Int] = { + implicit val mat = FlowMaterializer()(system) + + createSimpleIntPublisher(elements)(mat) + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala new file mode 100644 index 0000000000..4e02890aae --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -0,0 +1,490 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import scala.util.control.NoStackTrace + +class InterpreterSpec extends InterpreterSpecKit { + + "Interpreter" must { + + "implement map correctly" in new TestSetup(Seq(Map((x: Int) ⇒ x + 1))) { + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(OnNext(2))) + + upstream.onComplete() + lastEvents() should be(Set(OnComplete)) + } + + "implement chain of maps correctly" in new TestSetup(Seq( + Map((x: Int) ⇒ x + 1), + Map((x: Int) ⇒ x * 2), + Map((x: Int) ⇒ x + 1))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(OnNext(3))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(OnNext(5))) + + downstream.cancel() + lastEvents() should be(Set(Cancel)) + } + + "work with only boundary ops" in new TestSetup(Seq.empty) { + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(OnNext(0))) + + upstream.onComplete() + lastEvents() should be(Set(OnComplete)) + } + + "implement one-to-many many-to-one chain correctly" in new TestSetup(Seq( + Doubler(), + Filter((x: Int) ⇒ x != 0))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onComplete() + lastEvents() should be(Set(OnComplete)) + } + + "implement many-to-one one-to-many chain correctly" in new TestSetup(Seq( + Filter((x: Int) ⇒ x != 0), + Doubler())) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + downstream.cancel() + lastEvents() should be(Set(Cancel)) + } + + "implement take" in new TestSetup(Seq(Take(2))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(OnNext(0))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(OnNext(1), Cancel, OnComplete)) + } + + "implement take inside a chain" in new TestSetup(Seq( + Filter((x: Int) ⇒ x != 0), + Take(2), + Map((x: Int) ⇒ x + 1))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(OnNext(2))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(Cancel, OnComplete, OnNext(3))) + } + + "implement fold" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x))) { + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(RequestOne)) + + upstream.onComplete() + lastEvents() should be(Set(OnNext(3), OnComplete)) + } + + "implement fold with proper cancel" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(RequestOne)) + + downstream.cancel() + lastEvents() should be(Set(Cancel)) + } + + "work if fold completes while not in a push position" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x))) { + + lastEvents() should be(Set.empty) + + upstream.onComplete() + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(OnComplete, OnNext(0))) + } + + "implement grouped" in new TestSetup(Seq(Grouped(3))) { + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(OnNext(Vector(0, 1, 2)))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(3) + lastEvents() should be(Set(RequestOne)) + + upstream.onComplete() + lastEvents() should be(Set(OnNext(Vector(3)), OnComplete)) + } + + "implement conflate" in new TestSetup(Seq(Conflate( + (in: Int) ⇒ in, + (agg: Int, x: Int) ⇒ agg + x))) { + + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set.empty) + + upstream.onNext(0) + lastEvents() should be(Set(OnNext(0), RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(3))) + + downstream.requestOne() + lastEvents() should be(Set.empty) + + upstream.onNext(4) + lastEvents() should be(Set(OnNext(4), RequestOne)) + + downstream.cancel() + lastEvents() should be(Set(Cancel)) + } + + "implement expand" in new TestSetup(Seq(Expand( + (in: Int) ⇒ in, + (agg: Int) ⇒ (agg, agg)))) { + + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne, OnNext(0))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(0))) + + upstream.onNext(1) + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne, OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(1))) + + upstream.onComplete() + lastEvents() should be(Set(OnComplete)) + } + + "work with conflate-conflate" in new TestSetup(Seq( + Conflate( + (in: Int) ⇒ in, + (agg: Int, x: Int) ⇒ agg + x), + Conflate( + (in: Int) ⇒ in, + (agg: Int, x: Int) ⇒ agg + x))) { + + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set.empty) + + upstream.onNext(0) + lastEvents() should be(Set(OnNext(0), RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(3))) + + downstream.requestOne() + lastEvents() should be(Set.empty) + + upstream.onNext(4) + lastEvents() should be(Set(OnNext(4), RequestOne)) + + downstream.cancel() + lastEvents() should be(Set(Cancel)) + + } + + "work with expand-expand" in new TestSetup(Seq( + Expand( + (in: Int) ⇒ in, + (agg: Int) ⇒ (agg, agg)), + Expand( + (in: Int) ⇒ in, + (agg: Int) ⇒ (agg, agg)))) { + + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(0))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(0))) + + upstream.onNext(1) + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne, OnNext(0))) // One zero is still in the pipeline + + downstream.requestOne() + lastEvents() should be(Set(OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(1))) + + upstream.onComplete() + lastEvents() should be(Set(OnComplete)) + } + + "implement conflate-expand" in new TestSetup(Seq( + Conflate( + (in: Int) ⇒ in, + (agg: Int, x: Int) ⇒ agg + x), + Expand( + (in: Int) ⇒ in, + (agg: Int) ⇒ (agg, agg)))) { + + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(0))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(1))) + + upstream.onNext(2) + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(2))) + + downstream.cancel() + lastEvents() should be(Set(Cancel)) + } + + "implement expand-conflate" in { + pending + // Needs to detect divergent loops + } + + "implement doubler-conflate" in new TestSetup(Seq( + Doubler(), + Conflate( + (in: Int) ⇒ in, + (agg: Int, x: Int) ⇒ agg + x))) { + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(6))) + + } + + "implement expand-filter" in pending + + "implement take-conflate" in pending + + "implement conflate-take" in pending + + "implement take-expand" in pending + + "implement expand-take" in pending + + "implement take-take" in pending + + "implement take-drop" in pending + + "implement drop-take" in pending + + val TE = new Exception("TEST") with NoStackTrace { + override def toString = "TE" + } + + "handle external failure" in new TestSetup(Seq(Map((x: Int) ⇒ x + 1))) { + lastEvents() should be(Set.empty) + + upstream.onError(TE) + lastEvents() should be(Set(OnError(TE))) + + } + + "handle failure inside op" in new TestSetup(Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x))) { + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(OnNext(2))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(Cancel, OnError(TE))) + + } + + "handle failure inside op in middle of the chain" in new TestSetup(Seq( + Map((x: Int) ⇒ x + 1), + Map((x: Int) ⇒ if (x == 0) throw TE else x), + Map((x: Int) ⇒ x + 1))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(OnNext(4))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(-1) + lastEvents() should be(Set(Cancel, OnError(TE))) + + } + + "work with keep-going ops" in pending + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala new file mode 100644 index 0000000000..09028e5248 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import akka.stream.testkit.AkkaSpec + +trait InterpreterSpecKit extends AkkaSpec { + + case object OnComplete + case object Cancel + case class OnError(cause: Throwable) + case class OnNext(elem: Any) + case object RequestOne + + private[akka] case class Doubler[T]() extends DeterministicOp[T, T] { + var oneMore: Boolean = false + var lastElem: T = _ + + override def onPush(elem: T, ctxt: Context[T]): Directive = { + lastElem = elem + oneMore = true + ctxt.push(elem) + } + + override def onPull(ctxt: Context[T]): Directive = { + if (oneMore) { + oneMore = false + ctxt.push(lastElem) + } else ctxt.pull() + } + } + + abstract class TestSetup(ops: Seq[Op[_, _, _, _, _]], forkLimit: Int = 100, overflowToHeap: Boolean = false) { + private var lastEvent: Set[Any] = Set.empty + + val upstream = new UpstreamProbe + val downstream = new DownstreamProbe + val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream, forkLimit, overflowToHeap) + interpreter.init() + + def lastEvents(): Set[Any] = { + val result = lastEvent + lastEvent = Set.empty + result + } + + class UpstreamProbe extends BoundaryOp { + + override def onDownstreamFinish(ctxt: BoundaryContext): Directive = { + lastEvent += Cancel + ctxt.exit() + } + + override def onPull(ctxt: BoundaryContext): Directive = { + lastEvent += RequestOne + ctxt.exit() + } + + override def onPush(elem: Any, ctxt: BoundaryContext): Directive = + throw new UnsupportedOperationException("Cannot push the boundary") + + def onNext(elem: Any): Unit = enter().push(elem) + def onComplete(): Unit = enter().finish() + def onError(cause: Throwable): Unit = enter().fail(cause) + + } + + class DownstreamProbe extends BoundaryOp { + override def onPush(elem: Any, ctxt: BoundaryContext): Directive = { + lastEvent += OnNext(elem) + ctxt.exit() + } + + override def onUpstreamFinish(ctxt: BoundaryContext): Directive = { + lastEvent += OnComplete + ctxt.exit() + } + + override def onFailure(cause: Throwable, ctxt: BoundaryContext): Directive = { + lastEvent += OnError(cause) + ctxt.exit() + } + + override def onPull(ctxt: BoundaryContext): Directive = + throw new UnsupportedOperationException("Cannot pull the boundary") + + def requestOne(): Unit = enter().pull() + + def cancel(): Unit = enter().finish() + } + + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala new file mode 100644 index 0000000000..6019aa273d --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala @@ -0,0 +1,115 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl.fusing + +class InterpreterStressSpec extends InterpreterSpecKit { + + val chainLength = 1000 * 1000 + val halfLength = chainLength / 2 + val repetition = 100 + + "Interpreter" must { + + "work with a massive chain of maps" in new TestSetup(Seq.fill(chainLength)(Map((x: Int) ⇒ x + 1))) { + lastEvents() should be(Set.empty) + val tstamp = System.nanoTime() + + var i = 0 + while (i < repetition) { + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(i) + lastEvents() should be(Set(OnNext(i + chainLength))) + i += 1 + } + + upstream.onComplete() + lastEvents() should be(Set(OnComplete)) + + val time = (System.nanoTime() - tstamp) / (1000.0 * 1000.0 * 1000.0) + // FIXME: Not a real benchmark, should be replaced by a proper JMH bench + info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s") + } + + "work with a massive chain of maps with early complete" in new TestSetup(Seq.fill(halfLength)(Map((x: Int) ⇒ x + 1)) ++ + Seq(Take(repetition / 2)) ++ + Seq.fill(halfLength)(Map((x: Int) ⇒ x + 1))) { + + lastEvents() should be(Set.empty) + val tstamp = System.nanoTime() + + var i = 0 + while (i < (repetition / 2) - 1) { + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(i) + lastEvents() should be(Set(OnNext(i + chainLength))) + i += 1 + } + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(Cancel, OnComplete, OnNext(0 + chainLength))) + + val time = (System.nanoTime() - tstamp) / (1000.0 * 1000.0 * 1000.0) + // FIXME: Not a real benchmark, should be replaced by a proper JMH bench + info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s") + } + + "work with a massive chain of takes" in new TestSetup(Seq.fill(chainLength)(Take(1))) { + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(0) + lastEvents() should be(Set(Cancel, OnNext(0), OnComplete)) + + } + + "work with a massive chain of drops" in new TestSetup(Seq.fill(chainLength / 1000)(Drop(1))) { + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + var i = 0 + while (i < (chainLength / 1000)) { + upstream.onNext(0) + lastEvents() should be(Set(RequestOne)) + i += 1 + } + + upstream.onNext(0) + lastEvents() should be(Set(OnNext(0))) + + } + + "work with a massive chain of conflates by overflowing to the heap" in new TestSetup(Seq.fill(100000)(Conflate( + (in: Int) ⇒ in, + (agg: Int, in: Int) ⇒ agg + in)), + forkLimit = 100, + overflowToHeap = true) { + + lastEvents() should be(Set(RequestOne)) + + var i = 0 + while (i < repetition) { + upstream.onNext(1) + lastEvents() should be(Set(RequestOne)) + i += 1 + } + + downstream.requestOne() + lastEvents() should be(Set(OnNext(repetition))) + + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index d58302eaab..78c9ccc403 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -5,15 +5,16 @@ package akka.stream.scaladsl import java.util.concurrent.atomic.AtomicLong +import akka.stream.impl.fusing.{ Op, ActorInterpreter } + import scala.collection.immutable import scala.concurrent.duration._ import akka.actor.{ Props, ActorRefFactory, ActorRef } import akka.stream.{ TransformerLike, MaterializerSettings } import akka.stream.FlowMaterializer -import akka.stream.impl.{ ActorProcessorFactory, StreamSupervisor, ActorBasedFlowMaterializer } -import akka.stream.impl.Ast.{ Transform, AstNode } -import akka.stream.impl.TransformProcessorImpl +import akka.stream.impl.{ ActorProcessorFactory, TransformProcessorImpl, StreamSupervisor, ActorBasedFlowMaterializer } +import akka.stream.impl.Ast.{ Transform, Fusable, AstNode } import akka.stream.testkit.{ StreamTestKit, AkkaSpec } import akka.stream.testkit.ChainSetup import akka.testkit._ @@ -45,6 +46,23 @@ object FlowSpec { } } + class BrokenActorInterpreter( + _settings: MaterializerSettings, + _ops: Seq[Op[_, _, _, _, _]], + brokenMessage: Any) + extends ActorInterpreter(_settings, _ops) { + + import akka.stream.actor.ActorSubscriberMessage._ + + override protected[akka] def aroundReceive(receive: Receive, msg: Any) = { + msg match { + case OnNext(m) if m == brokenMessage ⇒ + throw new NullPointerException(s"I'm so broken [$m]") + case _ ⇒ super.aroundReceive(receive, msg) + } + } + } + class BrokenFlowMaterializer( settings: MaterializerSettings, supervisor: ActorRef, @@ -55,6 +73,7 @@ object FlowSpec { override def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { val props = op match { case t: Transform ⇒ Props(new BrokenTransformProcessorImpl(settings, t.mkTransformer(), brokenMessage)) + case f: Fusable ⇒ Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)).withDispatcher(settings.dispatcher) case o ⇒ ActorProcessorFactory.props(this, o) } val impl = actorOf(props, s"$flowName-$n-${op.name}") @@ -144,10 +163,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "deliver error signal when publisher immediately fails" in { new ChainSetup(identity, settings, toPublisher) { object WeirdError extends RuntimeException("weird test exception") - EventFilter[WeirdError.type](occurrences = 1) intercept { - upstreamSubscription.sendError(WeirdError) - downstream.expectError(WeirdError) - } + upstreamSubscription.sendError(WeirdError) + downstream.expectError(WeirdError) } } @@ -521,12 +538,10 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece downstreamSubscription.request(1) upstreamSubscription.expectRequest(1) - EventFilter[TestException.type](occurrences = 2) intercept { - upstreamSubscription.sendNext(5) - upstreamSubscription.expectRequest(1) - upstreamSubscription.expectCancellation() - downstream.expectError(TestException) - } + upstreamSubscription.sendNext(5) + upstreamSubscription.expectRequest(1) + upstreamSubscription.expectCancellation() + downstream.expectError(TestException) val downstream2 = StreamTestKit.SubscriberProbe[String]() publisher.subscribe(downstream2) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 0830199446..de303417ce 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -5,6 +5,8 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicLong +import akka.stream.impl.fusing.{ ActorInterpreter, Op } + import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.{ Await, Future } @@ -41,6 +43,8 @@ private[akka] object Ast { case class TimerTransform(name: String, mkTransformer: () ⇒ TimerTransformer[Any, Any]) extends AstNode + case class Fusable(ops: immutable.Seq[Op[_, _, _, _, _]], name: String) extends AstNode + case class MapAsync(f: Any ⇒ Future[Any]) extends AstNode { override def name = "mapAsync" } @@ -65,18 +69,6 @@ private[akka] object Ast { override def name = "concatFlatten" } - case class Conflate(seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any) extends AstNode { - override def name = "conflate" - } - - case class Expand(seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any)) extends AstNode { - override def name = "expand" - } - - case class Buffer(size: Int, overflowStrategy: OverflowStrategy) extends AstNode { - override def name = "buffer" - } - sealed trait JunctionAstNode { def name: String } @@ -326,6 +318,7 @@ private[akka] object ActorProcessorFactory { def props(materializer: FlowMaterializer, op: AstNode): Props = { val settings = materializer.settings (op match { + case Fusable(ops, _) ⇒ Props(new ActorInterpreter(materializer.settings, ops)) case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) case t: TimerTransform ⇒ Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer())) case m: MapAsync ⇒ Props(new MapAsyncProcessorImpl(settings, m.f)) @@ -334,9 +327,6 @@ private[akka] object ActorProcessorFactory { case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n)) case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) case ConcatAll ⇒ Props(new ConcatAllImpl(materializer)) - case cf: Conflate ⇒ Props(new ConflateImpl(settings, cf.seed, cf.aggregate)) - case ex: Expand ⇒ Props(new ExpandImpl(settings, ex.seed, ex.extrapolate)) - case bf: Buffer ⇒ Props(new BufferImpl(settings, bf.size, bf.overflowStrategy)) }).withDispatcher(settings.dispatcher) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/RateDetachedProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/RateDetachedProcessors.scala deleted file mode 100644 index a18915a40d..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/RateDetachedProcessors.scala +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.stream.impl - -import akka.stream.MaterializerSettings -import akka.stream.OverflowStrategy - -class ConflateImpl(_settings: MaterializerSettings, seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any) extends ActorProcessorImpl(_settings) { - var conflated: Any = null - - val waitNextZero: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { () ⇒ - conflated = seed(primaryInputs.dequeueInputElement()) - nextPhase(conflateThenEmit) - } - - val conflateThenEmit: TransferPhase = TransferPhase(primaryInputs.NeedsInput || primaryOutputs.NeedsDemand) { () ⇒ - if (primaryInputs.inputsAvailable) conflated = aggregate(conflated, primaryInputs.dequeueInputElement()) - if (primaryOutputs.demandAvailable) { - primaryOutputs.enqueueOutputElement(conflated) - conflated = null - nextPhase(waitNextZero) - } - } - - nextPhase(waitNextZero) -} - -class ExpandImpl(_settings: MaterializerSettings, seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any)) extends ActorProcessorImpl(_settings) { - var extrapolateState: Any = null - - val waitFirst: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { () ⇒ - extrapolateState = seed(primaryInputs.dequeueInputElement()) - nextPhase(emitFirst) - } - - val emitFirst: TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - emitExtrapolate() - nextPhase(extrapolateOrReset) - } - - val extrapolateOrReset: TransferPhase = TransferPhase(primaryInputs.NeedsInputOrComplete || primaryOutputs.NeedsDemand) { () ⇒ - if (primaryInputs.inputsDepleted) nextPhase(completedPhase) - else if (primaryInputs.inputsAvailable) { - extrapolateState = seed(primaryInputs.dequeueInputElement()) - nextPhase(emitFirst) - } else emitExtrapolate() - } - - def emitExtrapolate(): Unit = { - val (emit, nextState) = extrapolate(extrapolateState) - primaryOutputs.enqueueOutputElement(emit) - extrapolateState = nextState - } - - nextPhase(waitFirst) -} - -class BufferImpl(_settings: MaterializerSettings, size: Int, overflowStrategy: OverflowStrategy) extends ActorProcessorImpl(_settings) { - import OverflowStrategy._ - - val buffer = FixedSizeBuffer(size) - - val dropAction: () ⇒ Unit = overflowStrategy match { - case DropHead ⇒ buffer.dropHead - case DropTail ⇒ buffer.dropTail - case DropBuffer ⇒ buffer.clear - case Error ⇒ () ⇒ fail(new Error.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) - case Backpressure ⇒ () ⇒ nextPhase(bufferFull) - } - - val bufferEmpty: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { () ⇒ - buffer.enqueue(primaryInputs.dequeueInputElement()) - nextPhase(bufferNonEmpty) - } - - val bufferNonEmpty: TransferPhase = TransferPhase(primaryInputs.NeedsInput || primaryOutputs.NeedsDemand) { () ⇒ - if (primaryOutputs.demandAvailable) { - primaryOutputs.enqueueOutputElement(buffer.dequeue()) - if (buffer.isEmpty) nextPhase(bufferEmpty) - } else { - if (buffer.isFull) dropAction() - else buffer.enqueue(primaryInputs.dequeueInputElement()) - } - } - - val bufferFull: TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - primaryOutputs.enqueueOutputElement(buffer.dequeue()) - if (buffer.isEmpty) nextPhase(bufferEmpty) - else nextPhase(bufferNonEmpty) - } - - nextPhase(bufferEmpty) -} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala new file mode 100644 index 0000000000..9cfef375cd --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -0,0 +1,273 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import java.util.Arrays + +import akka.actor.{ Actor, ActorRef } +import akka.event.Logging +import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants } +import akka.stream.actor.ActorSubscriber.OnSubscribe +import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } +import akka.stream.impl._ +import org.reactivestreams.{ Subscriber, Subscription } + +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +private[akka] class BatchingActorInputBoundary(val size: Int) extends BoundaryOp { + require(size > 0, "buffer size cannot be zero") + require((size & (size - 1)) == 0, "buffer size must be a power of two") + + // TODO: buffer and batch sizing heuristics + private var upstream: Subscription = _ + private val inputBuffer = Array.ofDim[AnyRef](size) + private var inputBufferElements = 0 + private var nextInputElementCursor = 0 + private var upstreamCompleted = false + private var downstreamWaiting = false + private val IndexMask = size - 1 + + private def requestBatchSize = math.max(1, inputBuffer.length / 2) + private var batchRemaining = requestBatchSize + + val subreceive: SubReceive = new SubReceive(waitingForUpstream) + + private def dequeue(): Any = { + val elem = inputBuffer(nextInputElementCursor) + assert(elem ne null) + inputBuffer(nextInputElementCursor) = null + + batchRemaining -= 1 + if (batchRemaining == 0 && !upstreamCompleted) { + upstream.request(requestBatchSize) + batchRemaining = requestBatchSize + } + + inputBufferElements -= 1 + nextInputElementCursor = (nextInputElementCursor + 1) & IndexMask + elem + } + + private def enqueue(elem: Any): Unit = { + if (!upstreamCompleted) { + if (inputBufferElements == size) throw new IllegalStateException("Input buffer overrun") + inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef] + inputBufferElements += 1 + } + } + + override def onPush(elem: Any, ctxt: BoundaryContext): Directive = + throw new UnsupportedOperationException("BUG: Cannot push the upstream boundary") + + override def onPull(ctxt: BoundaryContext): Directive = { + if (inputBufferElements > 1) ctxt.push(dequeue()) + else if (inputBufferElements == 1) { + if (upstreamCompleted) ctxt.pushAndFinish(dequeue()) + else ctxt.push(dequeue()) + } else if (upstreamCompleted) { + ctxt.finish() + } else { + downstreamWaiting = true + ctxt.exit() + } + } + + override def onDownstreamFinish(ctxt: BoundaryContext): Directive = { + cancel() + ctxt.exit() + } + + def cancel(): Unit = { + if (!upstreamCompleted) { + upstreamCompleted = true + if (upstream ne null) upstream.cancel() + downstreamWaiting = false + clear() + } + } + + private def clear(): Unit = { + Arrays.fill(inputBuffer, 0, inputBuffer.length, null) + inputBufferElements = 0 + } + + private def onComplete(): Unit = { + upstreamCompleted = true + subreceive.become(completed) + if (inputBufferElements == 0) enter().finish() + } + + private def onSubscribe(subscription: Subscription): Unit = { + assert(subscription != null) + upstream = subscription + // Prefetch + upstream.request(inputBuffer.length) + subreceive.become(upstreamRunning) + } + + private def onError(e: Throwable): Unit = { + upstreamCompleted = true + subreceive.become(completed) + enter().fail(e) + } + + private def waitingForUpstream: Actor.Receive = { + case OnComplete ⇒ onComplete() + case OnSubscribe(subscription) ⇒ onSubscribe(subscription) + case OnError(cause) ⇒ onError(cause) + } + + private def upstreamRunning: Actor.Receive = { + case OnNext(element) ⇒ + enqueue(element) + if (downstreamWaiting) { + downstreamWaiting = false + enter().push(dequeue()) + } + + case OnComplete ⇒ onComplete() + case OnError(cause) ⇒ onError(cause) + case OnSubscribe(subscription) ⇒ subscription.cancel() // spec rule 2.5 + } + + private def completed: Actor.Receive = { + case OnSubscribe(subscription) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber") + } + +} + +/** + * INTERNAL API + */ +private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryOp { + + private var exposedPublisher: ActorPublisher[Any] = _ + + private var subscriber: Subscriber[Any] = _ + private var downstreamDemand: Long = 0L + // This flag is only used if complete/fail is called externally since this op turns into a Finished one inside the + // interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked) + private var downstreamCompleted = false + private var upstreamWaiting = true + + val subreceive = new SubReceive(waitingExposedPublisher) + + private def onNext(elem: Any): Unit = { + downstreamDemand -= 1 + subscriber.onNext(elem) + } + + private def complete(): Unit = { + if (!downstreamCompleted) { + downstreamCompleted = true + if (subscriber ne null) subscriber.onComplete() + if (exposedPublisher ne null) exposedPublisher.shutdown(None) + } + } + + def fail(e: Throwable): Unit = { + if (!downstreamCompleted) { + downstreamCompleted = true + if (subscriber ne null) subscriber.onError(e) + if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) + } + } + + override def onPush(elem: Any, ctxt: BoundaryContext): Directive = { + onNext(elem) + if (downstreamDemand > 0) ctxt.pull() + else if (downstreamCompleted) ctxt.finish() + else { + upstreamWaiting = true + ctxt.exit() + } + } + + override def onPull(ctxt: BoundaryContext): Directive = + throw new UnsupportedOperationException("BUG: Cannot pull the downstream boundary") + + override def onUpstreamFinish(ctxt: BoundaryContext): Directive = { + complete() + ctxt.finish() + } + + override def onFailure(cause: Throwable, ctxt: BoundaryContext): Directive = { + fail(cause) + ctxt.fail(cause) + } + + private def subscribePending(subscribers: Seq[Subscriber[Any]]): Unit = + subscribers foreach { sub ⇒ + if (subscriber eq null) { + subscriber = sub + subscriber.onSubscribe(new ActorSubscription(actor, subscriber)) + } else sub.onError(new IllegalStateException(s"${Logging.simpleName(this)} ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}")) + } + + protected def waitingExposedPublisher: Actor.Receive = { + case ExposedPublisher(publisher) ⇒ + exposedPublisher = publisher + subreceive.become(downstreamRunning) + case other ⇒ + throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]") + } + + protected def downstreamRunning: Actor.Receive = { + case SubscribePending ⇒ + subscribePending(exposedPublisher.takePendingSubscribers()) + case RequestMore(subscription, elements) ⇒ + + // TODO centralize overflow protection + downstreamDemand += elements + if (downstreamDemand < 0) { + // Long has overflown + val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue) + enter().finish() + fail(demandOverflowException) + } else if (upstreamWaiting) { + upstreamWaiting = false + enter().pull() + } + + case Cancel(subscription) ⇒ + downstreamCompleted = true + subscriber = null + exposedPublisher.shutdown(Some(new ActorPublisher.NormalShutdownException)) + enter().finish() + } + +} + +/** + * INTERNAL API + */ +private[akka] class ActorInterpreter(val settings: MaterializerSettings, val ops: Seq[Op[_, _, _, _, _]]) + extends Actor { + + private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize) + private val downstream = new ActorOutputBoundary(self) + private val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream) + interpreter.init() + + def receive: Receive = upstream.subreceive orElse downstream.subreceive + + override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = { + super.aroundReceive(receive, msg) + if (interpreter.isFinished) context.stop(self) + } + + override def postStop(): Unit = { + upstream.cancel() + downstream.fail(new IllegalStateException("Processor actor terminated abruptly")) + } + + override def postRestart(reason: Throwable): Unit = { + super.postRestart(reason) + throw new IllegalStateException("This actor cannot be restarted", reason) + } + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala new file mode 100644 index 0000000000..7ba9f3f573 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -0,0 +1,511 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import scala.annotation.tailrec +import scala.util.control.NonFatal + +// TODO: +// fix jumpback table with keep-going-on-complete ops (we might jump between otherwise isolated execution regions) +// implement grouped, buffer +// add recover + +trait Op[In, Out, PushD <: Directive, PullD <: Directive, Ctxt <: Context[Out]] { + private[fusing] var holding = false + private[fusing] var allowedToPush = false + private[fusing] var terminationPending = false + + def isHolding: Boolean = holding + def isFinishing: Boolean = terminationPending + def onPush(elem: In, ctxt: Ctxt): PushD + def onPull(ctxt: Ctxt): PullD + def onUpstreamFinish(ctxt: Ctxt): Directive = ctxt.finish() + def onDownstreamFinish(ctxt: Ctxt): Directive = ctxt.finish() + def onFailure(cause: Throwable, ctxt: Ctxt): Directive = ctxt.fail(cause) +} + +trait DeterministicOp[In, Out] extends Op[In, Out, Directive, Directive, Context[Out]] +trait DetachedOp[In, Out] extends Op[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out]] +trait BoundaryOp extends Op[Any, Any, Directive, Directive, BoundaryContext] { + private[fusing] var bctxt: BoundaryContext = _ + def enter(): BoundaryContext = bctxt +} + +trait TransitivePullOp[In, Out] extends DeterministicOp[In, Out] { + final override def onPull(ctxt: Context[Out]): Directive = ctxt.pull() +} + +sealed trait Directive +sealed trait UpstreamDirective extends Directive +sealed trait DownstreamDirective extends Directive +sealed trait TerminationDirective extends Directive +final class FreeDirective extends UpstreamDirective with DownstreamDirective with TerminationDirective + +sealed trait Context[Out] { + def push(elem: Out): DownstreamDirective + def pull(): UpstreamDirective + def finish(): FreeDirective + def pushAndFinish(elem: Out): DownstreamDirective + def fail(cause: Throwable): FreeDirective + def absorbTermination(): TerminationDirective +} + +trait DetachedContext[Out] extends Context[Out] { + def hold(): FreeDirective + def pushAndPull(elem: Out): FreeDirective +} + +trait BoundaryContext extends Context[Any] { + def exit(): FreeDirective +} + +object OneBoundedInterpreter { + final val PhantomDirective = null + + /** + * INTERNAL API + * + * This artificial op is used as a boundary to prevent two forked paths of execution (complete, cancel) to cross + * paths again. When finishing an op this op is injected in its place to isolate upstream and downstream execution + * domains. + */ + private[akka] object Finished extends BoundaryOp { + override def onPush(elem: Any, ctxt: BoundaryContext): UpstreamDirective = ctxt.finish() + override def onPull(ctxt: BoundaryContext): DownstreamDirective = ctxt.finish() + override def onUpstreamFinish(ctxt: BoundaryContext): Directive = ctxt.exit() + override def onDownstreamFinish(ctxt: BoundaryContext): Directive = ctxt.exit() + override def onFailure(cause: Throwable, ctxt: BoundaryContext): Directive = ctxt.exit() + } +} + +/** + * One-bounded interpreter for a linear chain of stream operations (graph support is possible and will be implemented + * later) + * + * The ideas in this interpreter are an amalgamation of earlier ideas, notably: + * - The original effect-tracking implementation by Johannes Rudolph -- the difference here that effects are not chained + * together as classes but the callstack is used instead and only certain combinations are allowed. + * - The on-stack reentrant implementation by Mathias Doenitz -- the difference here that reentrancy is handled by the + * interpreter itself, not user code, and the interpreter is able to use the heap when needed instead of the + * callstack. + * - The pinball interpreter by Endre Sándor Varga -- the difference here that the restricition for "one ball" is + * lifted by using isolated execution regions, completion handling is introduced and communication with the external + * world is done via boundary ops. + * + * The design goals/features of this interpreter are: + * - bounded callstack and heapless execution whenever possible + * - callstack usage should be constant for the most common ops independently of the size of the op-chain + * - allocation-free execution on the hot paths + * - enforced backpressure-safety (boundedness) on user defined ops at compile-time (and runtime in a few cases) + * + * The main driving idea of this interpreter is the concept of 1-bounded execution of well-formed free choice Petri + * nets (J. Desel and J. Esparza: Free Choice Petri Nets - https://www7.in.tum.de/~esparza/bookfc.html). Technically + * different kinds of operations partition the chain of ops into regions where *exactly one* event is active all the + * time. This "exactly one" property is enforced by proper types and runtime checks where needed. Currently there are + * three kinds of ops: + * + * - DeterministicOp implementations participate in 1-bounded regions. For every external non-completion signal these + * ops produce *exactly one* signal (completion is different, explained later) therefore keeping the number of events + * the same: exactly one. + * + * - DetachedOp implementations are boundaries between 1-bounded regions. This means that they need to enforce the + * "exactly one" property both on their upstream and downstream regions. As a consequence a DetachedOp can never + * answer an onPull with a ctxt.pull() or answer an onPush() with a ctxt.push() since such an action would "steal" + * the event from one region (resulting in zero signals) and would inject it to the other region (resulting in two + * signals). However DetachedOps have the ability to call ctxt.hold() as a response to onPush/onPull which temporarily + * takes the signal off and stops execution, at the same time putting the op in a "holding" state. If the op is in a + * holding state it contains one absorbed signal, therefore in this state the only possible command to call is + * ctxt.pushAndPull() which results in two events making the balance right again: + * 1 hold + 1 external event = 2 external event + * This mechanism allows synchronization between the upstream and downstream regions which otherwise can progress + * independently. + * + * - BoundaryOp implementations are meant to communicate with the external world. These ops do not have most of the + * safety properties enforced and should be used carefully. One important ability of BoundaryOps that they can take + * off an execution signal by calling ctxt.exit(). This is typically used immediately after an external signal has + * been produced (for example an actor message). BoundaryOps can also kickstart execution by calling enter() which + * returns a context they can use to inject signals into the interpreter. There is no checks in place to enforce that + * the number of signals taken out by exit() and the number of signals returned via enter() are the same -- using this + * op type needs extra care from the implementer. + * BoundaryOps are the elements that make the interpreter *tick*, there is no other way to start the interpreter + * than using a BoundaryOp. + * + * Operations are allowed to do early completion and cancel/complete their upstreams and downstreams. It is *not* + * allowed however to do these independently to avoid isolated execution islands. The only call possible is ctxt.finish() + * which is a combination of cancel/complete. + * Since onComplete is not a backpressured signal it is sometimes preferable to push a final element and then immediately + * finish. This combination is exposed as pushAndFinish() which enables op writers to propagate completion events without + * waiting for an extra round of pull. + * Another peculiarity is how to convert termination events (complete/failure) into elements. The problem + * here is that the termination events are not backpressured while elements are. This means that simply calling ctxt.push() + * as a response to onUpstreamFinished() will very likely break boundedness and result in a buffer overflow somewhere. + * Therefore the only allowed command in this case is ctxt.absorbTermination() which stops the propagation of the + * termination signal, and puts the op in a finishing state. Depending on whether the op has a pending pull signal it has + * not yet "consumed" by a push its onPull() handler might be called immediately. + * + * In order to execute different individual execution regions the interpreter uses the callstack to schedule these. The + * current execution forking operations are + * - ctxt.finish() which starts a wave of completion and cancellation in two directions. When an op calls finish() + * it is immediately replaced by an artificial Finished op which makes sure that the two execution paths are isolated + * forever. + * - ctxt.fail() which is similar to finish() + * - ctxt.pushAndPull() which (as a response to a previous ctxt.hold()) starts a wawe of downstream push and upstream + * pull. The two execution paths are isolated by the op itself since onPull() from downstream can only answered by hold or + * push, while onPush() from upstream can only answered by hold or pull -- it is impossible to "cross" the op. + * - ctxt.pushAndFinish() which is different from the forking ops above because the execution of push and finish happens on + * the same execution region and they are order dependent, too. + * The interpreter tracks the depth of recursive forking and allows various strategies of dealing with the situation + * when this depth reaches a certain limit. In the simplest case an error is reported (this is very useful for stress + * testing and finding callstack wasting bugs), in the other case the forked call is scheduled via a list -- i.e. instead + * of the stack the heap is used. + */ +class OneBoundedInterpreter(ops: Seq[Op[_, _, _, _, _]], val forkLimit: Int = 100, val overflowToHeap: Boolean = true) { + import OneBoundedInterpreter._ + type UntypedOp = Op[Any, Any, Directive, Directive, DetachedContext[Any]] + require(ops.nonEmpty, "OneBoundedInterpreter cannot be created without at least one Op") + + private val pipeline = ops.toArray.asInstanceOf[Array[UntypedOp]] + + /** + * This table is used to accelerate demand propagation upstream. All ops that implement TransitivePullOp are guaranteed + * to only do upstream propagation of demand signals, therefore it is not necessary to execute them but enough to + * "jump over" them. This means that when a chain of one million maps gets a downstream demand it is propagated + * to the upstream *in one step* instead of one million onPull() calls. + * This table maintains the positions where execution should jump from a current position when a pull event is to + * be executed. + */ + private val jumpBacks: Array[Int] = calculateJumpBacks + + private val Upstream = 0 + private val Downstream = pipeline.length - 1 + + // Var to hold the current element if pushing. The only reason why this var is needed is to avoid allocations and + // make it possible for the Pushing state to be an object + private var elementInFlight: Any = _ + // Points to the current point of execution inside the pipeline + private var activeOp = -1 + // The current interpreter state that decides what happens at the next round + private var state: State = Pushing + + // Counter that keeps track of the depth of recursive forked executions + private var forkCount = 0 + // List that is used as an auxiliary stack if fork recursion depth reaches forkLimit + private var overflowStack = List.empty[(Int, State, Any)] + + // see the jumpBacks variable for explanation + private def calculateJumpBacks: Array[Int] = { + val table = Array.ofDim[Int](pipeline.length) + var nextJumpBack = -1 + for (pos ← 0 until pipeline.length) { + table(pos) = nextJumpBack + if (!pipeline(pos).isInstanceOf[TransitivePullOp[_, _]]) nextJumpBack = pos + } + table + } + + private sealed trait State extends DetachedContext[Any] with BoundaryContext { + def advance(): Unit + + override def push(elem: Any): DownstreamDirective = { + if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot push while holding, only pushAndPull") + pipeline(activeOp).allowedToPush = false + elementInFlight = elem + state = Pushing + PhantomDirective + } + + override def pull(): UpstreamDirective = { + if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot pull while holding, only pushAndPull") + pipeline(activeOp).allowedToPush = !pipeline(activeOp).isInstanceOf[DetachedOp[_, _]] + state = Pulling + PhantomDirective + } + + override def finish(): FreeDirective = { + fork(Completing) + state = Cancelling + PhantomDirective + } + + override def pushAndFinish(elem: Any): DownstreamDirective = { + pipeline(activeOp) = Finished.asInstanceOf[UntypedOp] + // This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution + // path. Other forks are not order dependent because they execute on isolated execution domains which cannot + // "cross paths". This unsafeFork is relatively safe here because PushAndFinish simply absorbs all later downstream + // calls of pushAndFinish since the finish event has been scheduled already. + // It might be that there are some degenerate cases where this can blow up the stack with a very long chain but I + // am not aware of such scenario yet. If you know one, put it in InterpreterStressSpec :) + unsafeFork(PushFinish, elem) + elementInFlight = null + finish() + } + + override def fail(cause: Throwable): FreeDirective = { + fork(Failing(cause)) + state = Cancelling + PhantomDirective + } + + override def hold(): FreeDirective = { + if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot hold while already holding") + pipeline(activeOp).holding = true + exit() + } + + override def pushAndPull(elem: Any): FreeDirective = { + if (!pipeline(activeOp).holding) throw new IllegalStateException("Cannot pushAndPull without holding first") + pipeline(activeOp).holding = false + fork(Pushing, elem) + state = Pulling + PhantomDirective + } + + override def absorbTermination(): TerminationDirective = { + pipeline(activeOp).holding = false + finish() + } + + override def exit(): FreeDirective = { + elementInFlight = null + activeOp = -1 + PhantomDirective + } + } + + private object Pushing extends State { + override def advance(): Unit = { + activeOp += 1 + pipeline(activeOp).onPush(elementInFlight, ctxt = this) + } + } + + private object PushFinish extends State { + override def advance(): Unit = { + activeOp += 1 + pipeline(activeOp).onPush(elementInFlight, ctxt = this) + } + + override def pushAndFinish(elem: Any): DownstreamDirective = { + elementInFlight = elem + state = PushFinish + PhantomDirective + } + + override def finish(): FreeDirective = { + state = Completing + PhantomDirective + } + } + + private object Pulling extends State { + override def advance(): Unit = { + elementInFlight = null + activeOp = jumpBacks(activeOp) + pipeline(activeOp).onPull(ctxt = this) + } + + override def hold(): FreeDirective = { + super.hold() + pipeline(activeOp).allowedToPush = true + PhantomDirective + } + } + + private object Completing extends State { + override def advance(): Unit = { + elementInFlight = null + pipeline(activeOp) = Finished.asInstanceOf[UntypedOp] + activeOp += 1 + if (!pipeline(activeOp).isFinishing) pipeline(activeOp).onUpstreamFinish(ctxt = this) + else exit() + } + + override def finish(): FreeDirective = { + state = Completing + PhantomDirective + } + + override def absorbTermination(): TerminationDirective = { + pipeline(activeOp).terminationPending = true + pipeline(activeOp).holding = false + // FIXME: This state is potentially corrupted by the jumpBackTable (not updated when jumping over) + if (pipeline(activeOp).allowedToPush) pipeline(activeOp).onPull(ctxt = Pulling) + else exit() + PhantomDirective + } + } + + private object Cancelling extends State { + override def advance(): Unit = { + elementInFlight = null + pipeline(activeOp) = Finished.asInstanceOf[UntypedOp] + activeOp -= 1 + if (!pipeline(activeOp).isFinishing) pipeline(activeOp).onDownstreamFinish(ctxt = this) + else exit() + } + + override def finish(): FreeDirective = { + state = Cancelling + PhantomDirective + } + } + + private final case class Failing(cause: Throwable) extends State { + override def advance(): Unit = { + elementInFlight = null + pipeline(activeOp) = Finished.asInstanceOf[UntypedOp] + activeOp += 1 + pipeline(activeOp).onFailure(cause, ctxt = this) + } + + override def absorbTermination(): TerminationDirective = { + pipeline(activeOp).terminationPending = true + pipeline(activeOp).holding = false + if (pipeline(activeOp).allowedToPush) pipeline(activeOp).onPull(ctxt = Pulling) + else exit() + PhantomDirective + } + } + + @tailrec private def execute(): Unit = { + while (activeOp > -1 && activeOp < pipeline.length) { + try { + state.advance() + } catch { + case NonFatal(e) ⇒ + try { + state.fail(e) + } catch { + case NonFatal(_) ⇒ + // TODO: Make pipeline all failed + throw new IllegalStateException("Double Fault: Failure while handling failure", e) + } + } + } + + // Execute all delayed forks that were put on the heap if the fork limit has been reached + if (overflowStack.nonEmpty) { + val memo = overflowStack.head + activeOp = memo._1 + state = memo._2 + elementInFlight = memo._3 + overflowStack = overflowStack.tail + execute() + } + } + + /** + * Forks off execution of the pipeline by saving current position, fully executing the effects of the given + * forkState then setting back the position to the saved value. + * By default forking is executed by using the callstack. If the depth of forking ever reaches the configured forkLimit + * this method either fails (useful for testing) or starts using the heap instead of the callstack to avoid a + * stack overflow. + */ + private def fork(forkState: State, elem: Any = null): Unit = { + forkCount += 1 + if (forkCount == forkLimit) { + if (!overflowToHeap) throw new IllegalStateException("Fork limit reached") + else overflowStack ::= ((activeOp, forkState, elem)) + } else unsafeFork(forkState, elem) + forkCount -= 1 + } + + /** + * Unsafe fork always uses the stack for execution. This call is needed by pushAndComplete where the forked execution + * is order dependent since the push and complete events travel in the same direction and not isolated by a boundary + */ + private def unsafeFork(forkState: State, elem: Any = null): Unit = { + val savePos = activeOp + elementInFlight = elem + state = forkState + execute() + activeOp = savePos + PhantomDirective + } + + def init(): Unit = { + initBoundaries() + runDetached() + } + + def isFinished: Boolean = pipeline(Upstream) == Finished && pipeline(Downstream) == Finished + + /** + * This method injects a Context to each of the BoundaryOps. This will be the context returned by enter(). + */ + private def initBoundaries(): Unit = { + var op = 0 + while (op < pipeline.length) { + if (pipeline(op).isInstanceOf[BoundaryOp]) { + pipeline(op).asInstanceOf[BoundaryOp].bctxt = new State { + val entryPoint = op + + override def advance(): Unit = () + + override def push(elem: Any): DownstreamDirective = { + activeOp = entryPoint + super.push(elem) + execute() + PhantomDirective + } + + override def pull(): UpstreamDirective = { + activeOp = entryPoint + super.pull() + execute() + PhantomDirective + } + + override def finish(): FreeDirective = { + activeOp = entryPoint + super.finish() + execute() + PhantomDirective + } + + override def fail(cause: Throwable): FreeDirective = { + activeOp = entryPoint + super.fail(cause) + execute() + PhantomDirective + } + + override def hold(): FreeDirective = { + activeOp = entryPoint + super.hold() + execute() + PhantomDirective + } + + override def pushAndPull(elem: Any): FreeDirective = { + activeOp = entryPoint + super.pushAndPull(elem) + execute() + PhantomDirective + } + } + } + op += 1 + } + } + + /** + * Starts execution of detached regions. + * + * Since detached ops partition the pipeline into different 1-bounded domains is is necessary to inject a starting + * signal into these regions (since there is no external signal that would kick off their execution otherwise). + */ + private def runDetached(): Unit = { + var op = pipeline.length - 1 + while (op >= 0) { + if (pipeline(op).isInstanceOf[DetachedOp[_, _]]) { + activeOp = op + state = Pulling + execute() + } + op -= 1 + } + } + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala new file mode 100644 index 0000000000..92386c6f16 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -0,0 +1,240 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import akka.stream.OverflowStrategy +import akka.stream.impl.FixedSizeBuffer + +import scala.collection.immutable + +/** + * INTERNAL API + */ +private[akka] case class Map[In, Out](f: In ⇒ Out) extends TransitivePullOp[In, Out] { + override def onPush(elem: In, ctxt: Context[Out]): Directive = ctxt.push(f(elem)) +} + +/** + * INTERNAL API + */ +private[akka] case class Filter[T](p: T ⇒ Boolean) extends TransitivePullOp[T, T] { + override def onPush(elem: T, ctxt: Context[T]): Directive = + if (p(elem)) ctxt.push(elem) + else ctxt.pull() +} + +/** + * INTERNAL API + */ +private[akka] case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out]) extends DeterministicOp[In, Out] { + private var currentIterator: Iterator[Out] = Iterator.empty + + override def onPush(elem: In, ctxt: Context[Out]): Directive = { + currentIterator = f(elem).iterator + if (currentIterator.isEmpty) ctxt.pull() + else ctxt.push(currentIterator.next()) + } + + override def onPull(ctxt: Context[Out]): Directive = + if (currentIterator.hasNext) ctxt.push(currentIterator.next()) + else ctxt.pull() +} + +/** + * INTERNAL API + */ +private[akka] case class Take[T](count: Int) extends TransitivePullOp[T, T] { + private var left: Int = count + + override def onPush(elem: T, ctxt: Context[T]): Directive = { + left -= 1 + if (left == 0) ctxt.pushAndFinish(elem) + else ctxt.push(elem) + } +} + +/** + * INTERNAL API + */ +private[akka] case class Drop[T](count: Int) extends TransitivePullOp[T, T] { + private var left: Int = count + override def onPush(elem: T, ctxt: Context[T]): Directive = + if (left > 0) { + left -= 1 + ctxt.pull() + } else ctxt.push(elem) +} + +/** + * INTERNAL API + */ +private[akka] case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends DeterministicOp[In, Out] { + private var aggregator = zero + + override def onPush(elem: In, ctxt: Context[Out]): Directive = { + aggregator = f(aggregator, elem) + ctxt.pull() + } + + override def onPull(ctxt: Context[Out]): Directive = + if (isFinishing) ctxt.pushAndFinish(aggregator) + else ctxt.pull() + + override def onUpstreamFinish(ctxt: Context[Out]): Directive = ctxt.absorbTermination() +} + +/** + * INTERNAL API + */ +private[akka] case class Grouped[T](n: Int) extends DeterministicOp[T, immutable.Seq[T]] { + private var buf: Vector[T] = Vector.empty + + override def onPush(elem: T, ctxt: Context[immutable.Seq[T]]): Directive = { + buf :+= elem + if (buf.size == n) { + val emit = buf + buf = Vector.empty + ctxt.push(emit) + } else ctxt.pull() + } + + override def onPull(ctxt: Context[immutable.Seq[T]]): Directive = + if (isFinishing) ctxt.pushAndFinish(buf) + else ctxt.pull() + + override def onUpstreamFinish(ctxt: Context[immutable.Seq[T]]): Directive = + if (buf.isEmpty) ctxt.finish() + else ctxt.absorbTermination() +} + +/** + * INTERNAL API + */ +private[akka] case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedOp[T, T] { + import OverflowStrategy._ + + private val buffer = FixedSizeBuffer(size) + + override def onPush(elem: T, ctxt: DetachedContext[T]): UpstreamDirective = + if (isHolding) ctxt.pushAndPull(elem) + else enqueueAction(ctxt, elem) + + override def onPull(ctxt: DetachedContext[T]): DownstreamDirective = { + if (isFinishing) { + val elem = buffer.dequeue().asInstanceOf[T] + if (buffer.isEmpty) ctxt.pushAndFinish(elem) + else ctxt.push(elem) + } else if (isHolding) ctxt.pushAndPull(buffer.dequeue().asInstanceOf[T]) + else if (buffer.isEmpty) ctxt.hold() + else ctxt.push(buffer.dequeue().asInstanceOf[T]) + } + + override def onUpstreamFinish(ctxt: DetachedContext[T]): Directive = + if (buffer.isEmpty) ctxt.finish() + else ctxt.absorbTermination() + + val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = { + overflowStrategy match { + case DropHead ⇒ { (ctxt, elem) ⇒ + if (buffer.isFull) buffer.dropHead() + buffer.enqueue(elem) + ctxt.pull() + } + case DropTail ⇒ { (ctxt, elem) ⇒ + if (buffer.isFull) buffer.dropTail() + buffer.enqueue(elem) + ctxt.pull() + } + case DropBuffer ⇒ { (ctxt, elem) ⇒ + if (buffer.isFull) buffer.clear() + buffer.enqueue(elem) + ctxt.pull() + } + case Backpressure ⇒ { (ctxt, elem) ⇒ + buffer.enqueue(elem) + if (buffer.isFull) ctxt.hold() + else ctxt.pull() + } + case Error ⇒ { (ctxt, elem) ⇒ + if (buffer.isFull) ctxt.fail(new Error.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) + else { + buffer.enqueue(elem) + ctxt.pull() + } + } + } + } +} + +/** + * INTERNAL API + */ +private[akka] case class Completed[T]() extends DeterministicOp[T, T] { + override def onPush(elem: T, ctxt: Context[T]): Directive = ctxt.finish() + override def onPull(ctxt: Context[T]): Directive = ctxt.finish() +} + +/** + * INTERNAL API + */ +private[akka] case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out) extends DetachedOp[In, Out] { + private var agg: Any = null + + override def onPush(elem: In, ctxt: DetachedContext[Out]): UpstreamDirective = { + if (agg == null) agg = seed(elem) + else agg = aggregate(agg.asInstanceOf[Out], elem) + + if (!isHolding) ctxt.pull() else { + val result = agg.asInstanceOf[Out] + agg = null + ctxt.pushAndPull(result) + } + } + + override def onPull(ctxt: DetachedContext[Out]): DownstreamDirective = { + if (isFinishing) { + if (agg == null) ctxt.finish() + else { + val result = agg.asInstanceOf[Out] + agg = null + ctxt.pushAndFinish(result) + } + } else if (agg == null) ctxt.hold() + else { + val result = agg.asInstanceOf[Out] + agg = null + ctxt.push(result) + } + } + + override def onUpstreamFinish(ctxt: DetachedContext[Out]): Directive = ctxt.absorbTermination() +} + +/** + * INTERNAL API + */ +private[akka] case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapolate: Seed ⇒ (Out, Seed)) extends DetachedOp[In, Out] { + private var s: Any = null + + override def onPush(elem: In, ctxt: DetachedContext[Out]): UpstreamDirective = { + s = seed(elem) + if (isHolding) { + val (emit, newS) = extrapolate(s.asInstanceOf[Seed]) + s = newS + ctxt.pushAndPull(emit) + } else ctxt.hold() + } + + override def onPull(ctxt: DetachedContext[Out]): DownstreamDirective = { + if (s == null) ctxt.hold() + else { + val (emit, newS) = extrapolate(s.asInstanceOf[Seed]) + s = newS + if (isHolding) { + ctxt.pushAndPull(emit) + } else ctxt.push(emit) + } + + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 0d61d45547..17670f35d2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -4,6 +4,7 @@ package akka.stream.scaladsl import akka.stream.impl.Ast._ +import akka.stream.impl.fusing import akka.stream.{ TimerTransformer, Transformer, OverflowStrategy } import akka.util.Collections.EmptyImmutableSeq import scala.collection.immutable @@ -101,24 +102,19 @@ trait RunnableFlow { trait FlowOps[+Out] { import FlowOps._ type Repr[+O] + import akka.stream.impl.fusing /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. */ - def map[T](f: Out ⇒ T): Repr[T] = - transform("map", () ⇒ new Transformer[Out, T] { - def onNext(in: Out) = List(f(in)) - }) + def map[T](f: Out ⇒ T): Repr[T] = andThen(Fusable(Vector(fusing.Map(f)), "map")) /** * Transform each input element into a sequence of output elements that is * then flattened into the output stream. */ - def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T] = - transform("mapConcat", () ⇒ new Transformer[Out, T] { - def onNext(in: Out) = f(in) - }) + def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T] = andThen(Fusable(Vector(fusing.MapConcat(f)), "mapConcat")) /** * Transform this stream by applying the given function to each of the elements @@ -148,20 +144,16 @@ trait FlowOps[+Out] { /** * Only pass on those elements that satisfy the given predicate. */ - def filter(p: Out ⇒ Boolean): Repr[Out] = - transform("filter", () ⇒ new Transformer[Out, Out] { - def onNext(in: Out) = if (p(in)) List(in) else Nil - }) + def filter(p: Out ⇒ Boolean): Repr[Out] = andThen(Fusable(Vector(fusing.Filter(p)), "filter")) /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step. * Non-matching elements are filtered out. */ - def collect[T](pf: PartialFunction[Out, T]): Repr[T] = - transform("collect", () ⇒ new Transformer[Out, T] { - def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil - }) + def collect[T](pf: PartialFunction[Out, T]): Repr[T] = andThen(Fusable(Vector( + fusing.Filter(pf.isDefinedAt), + fusing.Map(pf.apply)), "filter")) /** * Chunk up this stream into groups of the given size, with the last group @@ -171,19 +163,7 @@ trait FlowOps[+Out] { */ def grouped(n: Int): Repr[immutable.Seq[Out]] = { require(n > 0, "n must be greater than 0") - transform("grouped", () ⇒ new Transformer[Out, immutable.Seq[Out]] { - var buf: Vector[Out] = Vector.empty - def onNext(in: Out) = { - buf :+= in - if (buf.size == n) { - val group = buf - buf = Vector.empty - List(group) - } else - Nil - } - override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) - }) + andThen(Fusable(Vector(fusing.Grouped(n)), "grouped")) } /** @@ -228,21 +208,8 @@ trait FlowOps[+Out] { * No elements will be dropped if `n` is zero or negative. */ def drop(n: Int): Repr[Out] = - transform("drop", () ⇒ new Transformer[Out, Out] { - var delegate: Transformer[Out, Out] = - if (n <= 0) identityTransformer.asInstanceOf[Transformer[Out, Out]] - else new Transformer[Out, Out] { - var c = n - def onNext(in: Out) = { - c -= 1 - if (c == 0) - delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] - Nil - } - } - - def onNext(in: Out) = delegate.onNext(in) - }) + if (n <= 0) andThen(Fusable(Vector.empty, "drop")) + else andThen(Fusable(Vector(fusing.Drop(n)), "drop")) /** * Discard the elements received within the given duration at beginning of the stream. @@ -273,22 +240,8 @@ trait FlowOps[+Out] { * or negative. */ def take(n: Int): Repr[Out] = - transform("take", () ⇒ new Transformer[Out, Out] { - var delegate: Transformer[Out, Out] = - if (n <= 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] - else new Transformer[Out, Out] { - var c = n - def onNext(in: Out) = { - c -= 1 - if (c == 0) - delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] - List(in) - } - } - - def onNext(in: Out) = delegate.onNext(in) - override def isComplete = delegate.isComplete - }) + if (n <= 0) andThen(Fusable(Vector(fusing.Completed()), "take")) + else andThen(Fusable(Vector(fusing.Take(n)), "take")) /** * Terminate processing (and cancel the upstream publisher) after the given @@ -325,7 +278,7 @@ trait FlowOps[+Out] { * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = - andThen(Conflate(seed.asInstanceOf[Any ⇒ Any], aggregate.asInstanceOf[(Any, Any) ⇒ Any])) + andThen(Fusable(Vector(fusing.Conflate(seed, aggregate)), "conflate")) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older @@ -341,7 +294,7 @@ trait FlowOps[+Out] { * state. */ def expand[S, U](seed: Out ⇒ S, extrapolate: S ⇒ (U, S)): Repr[U] = - andThen(Expand(seed.asInstanceOf[Any ⇒ Any], extrapolate.asInstanceOf[Any ⇒ (Any, Any)])) + andThen(Fusable(Vector(fusing.Expand(seed, extrapolate)), "expand")) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. @@ -353,7 +306,7 @@ trait FlowOps[+Out] { */ def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = { require(size > 0, s"Buffer size must be larger than zero but was [$size]") - andThen(Buffer(size, overflowStrategy)) + andThen(Fusable(Vector(fusing.Buffer(size, overflowStrategy)), "buffer")) } /**