+str: Implement efficient stream op DSL

This commit is contained in:
Endre Sándor Varga 2014-10-08 18:16:57 +02:00
parent 3cd0770d46
commit 9ecd8b61fe
11 changed files with 1808 additions and 185 deletions

View file

@ -0,0 +1,36 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.impl.{ Ast, ActorBasedFlowMaterializer }
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import org.reactivestreams.{ Publisher, Processor }
class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
val processorCounter = new AtomicInteger
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)
implicit val materializer = FlowMaterializer(settings)(system)
val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet()
val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode(
Ast.Fusable(Vector(akka.stream.impl.fusing.Map[Int, Int](identity)), "identity"), flowName, 1)
processor.asInstanceOf[Processor[Int, Int]]
}
override def createHelperPublisher(elements: Long): Publisher[Int] = {
implicit val mat = FlowMaterializer()(system)
createSimpleIntPublisher(elements)(mat)
}
}

View file

@ -0,0 +1,490 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.fusing
import scala.util.control.NoStackTrace
class InterpreterSpec extends InterpreterSpecKit {
"Interpreter" must {
"implement map correctly" in new TestSetup(Seq(Map((x: Int) x + 1))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(2)))
upstream.onComplete()
lastEvents() should be(Set(OnComplete))
}
"implement chain of maps correctly" in new TestSetup(Seq(
Map((x: Int) x + 1),
Map((x: Int) x * 2),
Map((x: Int) x + 1))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(OnNext(3)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(5)))
downstream.cancel()
lastEvents() should be(Set(Cancel))
}
"work with only boundary ops" in new TestSetup(Seq.empty) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(OnNext(0)))
upstream.onComplete()
lastEvents() should be(Set(OnComplete))
}
"implement one-to-many many-to-one chain correctly" in new TestSetup(Seq(
Doubler(),
Filter((x: Int) x != 0))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onComplete()
lastEvents() should be(Set(OnComplete))
}
"implement many-to-one one-to-many chain correctly" in new TestSetup(Seq(
Filter((x: Int) x != 0),
Doubler())) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
downstream.cancel()
lastEvents() should be(Set(Cancel))
}
"implement take" in new TestSetup(Seq(Take(2))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(OnNext(0)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(1), Cancel, OnComplete))
}
"implement take inside a chain" in new TestSetup(Seq(
Filter((x: Int) x != 0),
Take(2),
Map((x: Int) x + 1))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(Cancel, OnComplete, OnNext(3)))
}
"implement fold" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) agg + x))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(RequestOne))
upstream.onComplete()
lastEvents() should be(Set(OnNext(3), OnComplete))
}
"implement fold with proper cancel" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) agg + x))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(RequestOne))
downstream.cancel()
lastEvents() should be(Set(Cancel))
}
"work if fold completes while not in a push position" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) agg + x))) {
lastEvents() should be(Set.empty)
upstream.onComplete()
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(OnComplete, OnNext(0)))
}
"implement grouped" in new TestSetup(Seq(Grouped(3))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(Vector(0, 1, 2))))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(3)
lastEvents() should be(Set(RequestOne))
upstream.onComplete()
lastEvents() should be(Set(OnNext(Vector(3)), OnComplete))
}
"implement conflate" in new TestSetup(Seq(Conflate(
(in: Int) in,
(agg: Int, x: Int) agg + x))) {
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set.empty)
upstream.onNext(0)
lastEvents() should be(Set(OnNext(0), RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(OnNext(3)))
downstream.requestOne()
lastEvents() should be(Set.empty)
upstream.onNext(4)
lastEvents() should be(Set(OnNext(4), RequestOne))
downstream.cancel()
lastEvents() should be(Set(Cancel))
}
"implement expand" in new TestSetup(Seq(Expand(
(in: Int) in,
(agg: Int) (agg, agg)))) {
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne, OnNext(0)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(0)))
upstream.onNext(1)
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne, OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(1)))
upstream.onComplete()
lastEvents() should be(Set(OnComplete))
}
"work with conflate-conflate" in new TestSetup(Seq(
Conflate(
(in: Int) in,
(agg: Int, x: Int) agg + x),
Conflate(
(in: Int) in,
(agg: Int, x: Int) agg + x))) {
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set.empty)
upstream.onNext(0)
lastEvents() should be(Set(OnNext(0), RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(OnNext(3)))
downstream.requestOne()
lastEvents() should be(Set.empty)
upstream.onNext(4)
lastEvents() should be(Set(OnNext(4), RequestOne))
downstream.cancel()
lastEvents() should be(Set(Cancel))
}
"work with expand-expand" in new TestSetup(Seq(
Expand(
(in: Int) in,
(agg: Int) (agg, agg)),
Expand(
(in: Int) in,
(agg: Int) (agg, agg)))) {
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(OnNext(0)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(0)))
upstream.onNext(1)
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne, OnNext(0))) // One zero is still in the pipeline
downstream.requestOne()
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(1)))
upstream.onComplete()
lastEvents() should be(Set(OnComplete))
}
"implement conflate-expand" in new TestSetup(Seq(
Conflate(
(in: Int) in,
(agg: Int, x: Int) agg + x),
Expand(
(in: Int) in,
(agg: Int) (agg, agg)))) {
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(OnNext(0)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(1)))
upstream.onNext(2)
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(OnNext(2)))
downstream.cancel()
lastEvents() should be(Set(Cancel))
}
"implement expand-conflate" in {
pending
// Needs to detect divergent loops
}
"implement doubler-conflate" in new TestSetup(Seq(
Doubler(),
Conflate(
(in: Int) in,
(agg: Int, x: Int) agg + x))) {
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(OnNext(6)))
}
"implement expand-filter" in pending
"implement take-conflate" in pending
"implement conflate-take" in pending
"implement take-expand" in pending
"implement expand-take" in pending
"implement take-take" in pending
"implement take-drop" in pending
"implement drop-take" in pending
val TE = new Exception("TEST") with NoStackTrace {
override def toString = "TE"
}
"handle external failure" in new TestSetup(Seq(Map((x: Int) x + 1))) {
lastEvents() should be(Set.empty)
upstream.onError(TE)
lastEvents() should be(Set(OnError(TE)))
}
"handle failure inside op" in new TestSetup(Seq(Map((x: Int) if (x == 0) throw TE else x))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(Cancel, OnError(TE)))
}
"handle failure inside op in middle of the chain" in new TestSetup(Seq(
Map((x: Int) x + 1),
Map((x: Int) if (x == 0) throw TE else x),
Map((x: Int) x + 1))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(4)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(-1)
lastEvents() should be(Set(Cancel, OnError(TE)))
}
"work with keep-going ops" in pending
}
}

View file

@ -0,0 +1,94 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.fusing
import akka.stream.testkit.AkkaSpec
trait InterpreterSpecKit extends AkkaSpec {
case object OnComplete
case object Cancel
case class OnError(cause: Throwable)
case class OnNext(elem: Any)
case object RequestOne
private[akka] case class Doubler[T]() extends DeterministicOp[T, T] {
var oneMore: Boolean = false
var lastElem: T = _
override def onPush(elem: T, ctxt: Context[T]): Directive = {
lastElem = elem
oneMore = true
ctxt.push(elem)
}
override def onPull(ctxt: Context[T]): Directive = {
if (oneMore) {
oneMore = false
ctxt.push(lastElem)
} else ctxt.pull()
}
}
abstract class TestSetup(ops: Seq[Op[_, _, _, _, _]], forkLimit: Int = 100, overflowToHeap: Boolean = false) {
private var lastEvent: Set[Any] = Set.empty
val upstream = new UpstreamProbe
val downstream = new DownstreamProbe
val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream, forkLimit, overflowToHeap)
interpreter.init()
def lastEvents(): Set[Any] = {
val result = lastEvent
lastEvent = Set.empty
result
}
class UpstreamProbe extends BoundaryOp {
override def onDownstreamFinish(ctxt: BoundaryContext): Directive = {
lastEvent += Cancel
ctxt.exit()
}
override def onPull(ctxt: BoundaryContext): Directive = {
lastEvent += RequestOne
ctxt.exit()
}
override def onPush(elem: Any, ctxt: BoundaryContext): Directive =
throw new UnsupportedOperationException("Cannot push the boundary")
def onNext(elem: Any): Unit = enter().push(elem)
def onComplete(): Unit = enter().finish()
def onError(cause: Throwable): Unit = enter().fail(cause)
}
class DownstreamProbe extends BoundaryOp {
override def onPush(elem: Any, ctxt: BoundaryContext): Directive = {
lastEvent += OnNext(elem)
ctxt.exit()
}
override def onUpstreamFinish(ctxt: BoundaryContext): Directive = {
lastEvent += OnComplete
ctxt.exit()
}
override def onFailure(cause: Throwable, ctxt: BoundaryContext): Directive = {
lastEvent += OnError(cause)
ctxt.exit()
}
override def onPull(ctxt: BoundaryContext): Directive =
throw new UnsupportedOperationException("Cannot pull the boundary")
def requestOne(): Unit = enter().pull()
def cancel(): Unit = enter().finish()
}
}
}

View file

@ -0,0 +1,115 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.fusing
class InterpreterStressSpec extends InterpreterSpecKit {
val chainLength = 1000 * 1000
val halfLength = chainLength / 2
val repetition = 100
"Interpreter" must {
"work with a massive chain of maps" in new TestSetup(Seq.fill(chainLength)(Map((x: Int) x + 1))) {
lastEvents() should be(Set.empty)
val tstamp = System.nanoTime()
var i = 0
while (i < repetition) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(i)
lastEvents() should be(Set(OnNext(i + chainLength)))
i += 1
}
upstream.onComplete()
lastEvents() should be(Set(OnComplete))
val time = (System.nanoTime() - tstamp) / (1000.0 * 1000.0 * 1000.0)
// FIXME: Not a real benchmark, should be replaced by a proper JMH bench
info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s")
}
"work with a massive chain of maps with early complete" in new TestSetup(Seq.fill(halfLength)(Map((x: Int) x + 1)) ++
Seq(Take(repetition / 2)) ++
Seq.fill(halfLength)(Map((x: Int) x + 1))) {
lastEvents() should be(Set.empty)
val tstamp = System.nanoTime()
var i = 0
while (i < (repetition / 2) - 1) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(i)
lastEvents() should be(Set(OnNext(i + chainLength)))
i += 1
}
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(Cancel, OnComplete, OnNext(0 + chainLength)))
val time = (System.nanoTime() - tstamp) / (1000.0 * 1000.0 * 1000.0)
// FIXME: Not a real benchmark, should be replaced by a proper JMH bench
info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s")
}
"work with a massive chain of takes" in new TestSetup(Seq.fill(chainLength)(Take(1))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(Cancel, OnNext(0), OnComplete))
}
"work with a massive chain of drops" in new TestSetup(Seq.fill(chainLength / 1000)(Drop(1))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
var i = 0
while (i < (chainLength / 1000)) {
upstream.onNext(0)
lastEvents() should be(Set(RequestOne))
i += 1
}
upstream.onNext(0)
lastEvents() should be(Set(OnNext(0)))
}
"work with a massive chain of conflates by overflowing to the heap" in new TestSetup(Seq.fill(100000)(Conflate(
(in: Int) in,
(agg: Int, in: Int) agg + in)),
forkLimit = 100,
overflowToHeap = true) {
lastEvents() should be(Set(RequestOne))
var i = 0
while (i < repetition) {
upstream.onNext(1)
lastEvents() should be(Set(RequestOne))
i += 1
}
downstream.requestOne()
lastEvents() should be(Set(OnNext(repetition)))
}
}
}

View file

@ -5,15 +5,16 @@ package akka.stream.scaladsl
import java.util.concurrent.atomic.AtomicLong
import akka.stream.impl.fusing.{ Op, ActorInterpreter }
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.{ Props, ActorRefFactory, ActorRef }
import akka.stream.{ TransformerLike, MaterializerSettings }
import akka.stream.FlowMaterializer
import akka.stream.impl.{ ActorProcessorFactory, StreamSupervisor, ActorBasedFlowMaterializer }
import akka.stream.impl.Ast.{ Transform, AstNode }
import akka.stream.impl.TransformProcessorImpl
import akka.stream.impl.{ ActorProcessorFactory, TransformProcessorImpl, StreamSupervisor, ActorBasedFlowMaterializer }
import akka.stream.impl.Ast.{ Transform, Fusable, AstNode }
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
import akka.stream.testkit.ChainSetup
import akka.testkit._
@ -45,6 +46,23 @@ object FlowSpec {
}
}
class BrokenActorInterpreter(
_settings: MaterializerSettings,
_ops: Seq[Op[_, _, _, _, _]],
brokenMessage: Any)
extends ActorInterpreter(_settings, _ops) {
import akka.stream.actor.ActorSubscriberMessage._
override protected[akka] def aroundReceive(receive: Receive, msg: Any) = {
msg match {
case OnNext(m) if m == brokenMessage
throw new NullPointerException(s"I'm so broken [$m]")
case _ super.aroundReceive(receive, msg)
}
}
}
class BrokenFlowMaterializer(
settings: MaterializerSettings,
supervisor: ActorRef,
@ -55,6 +73,7 @@ object FlowSpec {
override def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val props = op match {
case t: Transform Props(new BrokenTransformProcessorImpl(settings, t.mkTransformer(), brokenMessage))
case f: Fusable Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)).withDispatcher(settings.dispatcher)
case o ActorProcessorFactory.props(this, o)
}
val impl = actorOf(props, s"$flowName-$n-${op.name}")
@ -144,10 +163,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
"deliver error signal when publisher immediately fails" in {
new ChainSetup(identity, settings, toPublisher) {
object WeirdError extends RuntimeException("weird test exception")
EventFilter[WeirdError.type](occurrences = 1) intercept {
upstreamSubscription.sendError(WeirdError)
downstream.expectError(WeirdError)
}
upstreamSubscription.sendError(WeirdError)
downstream.expectError(WeirdError)
}
}
@ -521,12 +538,10 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
downstreamSubscription.request(1)
upstreamSubscription.expectRequest(1)
EventFilter[TestException.type](occurrences = 2) intercept {
upstreamSubscription.sendNext(5)
upstreamSubscription.expectRequest(1)
upstreamSubscription.expectCancellation()
downstream.expectError(TestException)
}
upstreamSubscription.sendNext(5)
upstreamSubscription.expectRequest(1)
upstreamSubscription.expectCancellation()
downstream.expectError(TestException)
val downstream2 = StreamTestKit.SubscriberProbe[String]()
publisher.subscribe(downstream2)

View file

@ -5,6 +5,8 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicLong
import akka.stream.impl.fusing.{ ActorInterpreter, Op }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ Await, Future }
@ -41,6 +43,8 @@ private[akka] object Ast {
case class TimerTransform(name: String, mkTransformer: () TimerTransformer[Any, Any]) extends AstNode
case class Fusable(ops: immutable.Seq[Op[_, _, _, _, _]], name: String) extends AstNode
case class MapAsync(f: Any Future[Any]) extends AstNode {
override def name = "mapAsync"
}
@ -65,18 +69,6 @@ private[akka] object Ast {
override def name = "concatFlatten"
}
case class Conflate(seed: Any Any, aggregate: (Any, Any) Any) extends AstNode {
override def name = "conflate"
}
case class Expand(seed: Any Any, extrapolate: Any (Any, Any)) extends AstNode {
override def name = "expand"
}
case class Buffer(size: Int, overflowStrategy: OverflowStrategy) extends AstNode {
override def name = "buffer"
}
sealed trait JunctionAstNode {
def name: String
}
@ -326,6 +318,7 @@ private[akka] object ActorProcessorFactory {
def props(materializer: FlowMaterializer, op: AstNode): Props = {
val settings = materializer.settings
(op match {
case Fusable(ops, _) Props(new ActorInterpreter(materializer.settings, ops))
case t: Transform Props(new TransformProcessorImpl(settings, t.mkTransformer()))
case t: TimerTransform Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer()))
case m: MapAsync Props(new MapAsyncProcessorImpl(settings, m.f))
@ -334,9 +327,6 @@ private[akka] object ActorProcessorFactory {
case tt: PrefixAndTail Props(new PrefixAndTailImpl(settings, tt.n))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
case ConcatAll Props(new ConcatAllImpl(materializer))
case cf: Conflate Props(new ConflateImpl(settings, cf.seed, cf.aggregate))
case ex: Expand Props(new ExpandImpl(settings, ex.seed, ex.extrapolate))
case bf: Buffer Props(new BufferImpl(settings, bf.size, bf.overflowStrategy))
}).withDispatcher(settings.dispatcher)
}

View file

@ -1,94 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.MaterializerSettings
import akka.stream.OverflowStrategy
class ConflateImpl(_settings: MaterializerSettings, seed: Any Any, aggregate: (Any, Any) Any) extends ActorProcessorImpl(_settings) {
var conflated: Any = null
val waitNextZero: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { ()
conflated = seed(primaryInputs.dequeueInputElement())
nextPhase(conflateThenEmit)
}
val conflateThenEmit: TransferPhase = TransferPhase(primaryInputs.NeedsInput || primaryOutputs.NeedsDemand) { ()
if (primaryInputs.inputsAvailable) conflated = aggregate(conflated, primaryInputs.dequeueInputElement())
if (primaryOutputs.demandAvailable) {
primaryOutputs.enqueueOutputElement(conflated)
conflated = null
nextPhase(waitNextZero)
}
}
nextPhase(waitNextZero)
}
class ExpandImpl(_settings: MaterializerSettings, seed: Any Any, extrapolate: Any (Any, Any)) extends ActorProcessorImpl(_settings) {
var extrapolateState: Any = null
val waitFirst: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { ()
extrapolateState = seed(primaryInputs.dequeueInputElement())
nextPhase(emitFirst)
}
val emitFirst: TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { ()
emitExtrapolate()
nextPhase(extrapolateOrReset)
}
val extrapolateOrReset: TransferPhase = TransferPhase(primaryInputs.NeedsInputOrComplete || primaryOutputs.NeedsDemand) { ()
if (primaryInputs.inputsDepleted) nextPhase(completedPhase)
else if (primaryInputs.inputsAvailable) {
extrapolateState = seed(primaryInputs.dequeueInputElement())
nextPhase(emitFirst)
} else emitExtrapolate()
}
def emitExtrapolate(): Unit = {
val (emit, nextState) = extrapolate(extrapolateState)
primaryOutputs.enqueueOutputElement(emit)
extrapolateState = nextState
}
nextPhase(waitFirst)
}
class BufferImpl(_settings: MaterializerSettings, size: Int, overflowStrategy: OverflowStrategy) extends ActorProcessorImpl(_settings) {
import OverflowStrategy._
val buffer = FixedSizeBuffer(size)
val dropAction: () Unit = overflowStrategy match {
case DropHead buffer.dropHead
case DropTail buffer.dropTail
case DropBuffer buffer.clear
case Error () fail(new Error.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
case Backpressure () nextPhase(bufferFull)
}
val bufferEmpty: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { ()
buffer.enqueue(primaryInputs.dequeueInputElement())
nextPhase(bufferNonEmpty)
}
val bufferNonEmpty: TransferPhase = TransferPhase(primaryInputs.NeedsInput || primaryOutputs.NeedsDemand) { ()
if (primaryOutputs.demandAvailable) {
primaryOutputs.enqueueOutputElement(buffer.dequeue())
if (buffer.isEmpty) nextPhase(bufferEmpty)
} else {
if (buffer.isFull) dropAction()
else buffer.enqueue(primaryInputs.dequeueInputElement())
}
}
val bufferFull: TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { ()
primaryOutputs.enqueueOutputElement(buffer.dequeue())
if (buffer.isEmpty) nextPhase(bufferEmpty)
else nextPhase(bufferNonEmpty)
}
nextPhase(bufferEmpty)
}

View file

@ -0,0 +1,273 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.fusing
import java.util.Arrays
import akka.actor.{ Actor, ActorRef }
import akka.event.Logging
import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants }
import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete }
import akka.stream.impl._
import org.reactivestreams.{ Subscriber, Subscription }
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
private[akka] class BatchingActorInputBoundary(val size: Int) extends BoundaryOp {
require(size > 0, "buffer size cannot be zero")
require((size & (size - 1)) == 0, "buffer size must be a power of two")
// TODO: buffer and batch sizing heuristics
private var upstream: Subscription = _
private val inputBuffer = Array.ofDim[AnyRef](size)
private var inputBufferElements = 0
private var nextInputElementCursor = 0
private var upstreamCompleted = false
private var downstreamWaiting = false
private val IndexMask = size - 1
private def requestBatchSize = math.max(1, inputBuffer.length / 2)
private var batchRemaining = requestBatchSize
val subreceive: SubReceive = new SubReceive(waitingForUpstream)
private def dequeue(): Any = {
val elem = inputBuffer(nextInputElementCursor)
assert(elem ne null)
inputBuffer(nextInputElementCursor) = null
batchRemaining -= 1
if (batchRemaining == 0 && !upstreamCompleted) {
upstream.request(requestBatchSize)
batchRemaining = requestBatchSize
}
inputBufferElements -= 1
nextInputElementCursor = (nextInputElementCursor + 1) & IndexMask
elem
}
private def enqueue(elem: Any): Unit = {
if (!upstreamCompleted) {
if (inputBufferElements == size) throw new IllegalStateException("Input buffer overrun")
inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef]
inputBufferElements += 1
}
}
override def onPush(elem: Any, ctxt: BoundaryContext): Directive =
throw new UnsupportedOperationException("BUG: Cannot push the upstream boundary")
override def onPull(ctxt: BoundaryContext): Directive = {
if (inputBufferElements > 1) ctxt.push(dequeue())
else if (inputBufferElements == 1) {
if (upstreamCompleted) ctxt.pushAndFinish(dequeue())
else ctxt.push(dequeue())
} else if (upstreamCompleted) {
ctxt.finish()
} else {
downstreamWaiting = true
ctxt.exit()
}
}
override def onDownstreamFinish(ctxt: BoundaryContext): Directive = {
cancel()
ctxt.exit()
}
def cancel(): Unit = {
if (!upstreamCompleted) {
upstreamCompleted = true
if (upstream ne null) upstream.cancel()
downstreamWaiting = false
clear()
}
}
private def clear(): Unit = {
Arrays.fill(inputBuffer, 0, inputBuffer.length, null)
inputBufferElements = 0
}
private def onComplete(): Unit = {
upstreamCompleted = true
subreceive.become(completed)
if (inputBufferElements == 0) enter().finish()
}
private def onSubscribe(subscription: Subscription): Unit = {
assert(subscription != null)
upstream = subscription
// Prefetch
upstream.request(inputBuffer.length)
subreceive.become(upstreamRunning)
}
private def onError(e: Throwable): Unit = {
upstreamCompleted = true
subreceive.become(completed)
enter().fail(e)
}
private def waitingForUpstream: Actor.Receive = {
case OnComplete onComplete()
case OnSubscribe(subscription) onSubscribe(subscription)
case OnError(cause) onError(cause)
}
private def upstreamRunning: Actor.Receive = {
case OnNext(element)
enqueue(element)
if (downstreamWaiting) {
downstreamWaiting = false
enter().push(dequeue())
}
case OnComplete onComplete()
case OnError(cause) onError(cause)
case OnSubscribe(subscription) subscription.cancel() // spec rule 2.5
}
private def completed: Actor.Receive = {
case OnSubscribe(subscription) throw new IllegalStateException("Cannot subscribe shutdown subscriber")
}
}
/**
* INTERNAL API
*/
private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryOp {
private var exposedPublisher: ActorPublisher[Any] = _
private var subscriber: Subscriber[Any] = _
private var downstreamDemand: Long = 0L
// This flag is only used if complete/fail is called externally since this op turns into a Finished one inside the
// interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked)
private var downstreamCompleted = false
private var upstreamWaiting = true
val subreceive = new SubReceive(waitingExposedPublisher)
private def onNext(elem: Any): Unit = {
downstreamDemand -= 1
subscriber.onNext(elem)
}
private def complete(): Unit = {
if (!downstreamCompleted) {
downstreamCompleted = true
if (subscriber ne null) subscriber.onComplete()
if (exposedPublisher ne null) exposedPublisher.shutdown(None)
}
}
def fail(e: Throwable): Unit = {
if (!downstreamCompleted) {
downstreamCompleted = true
if (subscriber ne null) subscriber.onError(e)
if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
}
}
override def onPush(elem: Any, ctxt: BoundaryContext): Directive = {
onNext(elem)
if (downstreamDemand > 0) ctxt.pull()
else if (downstreamCompleted) ctxt.finish()
else {
upstreamWaiting = true
ctxt.exit()
}
}
override def onPull(ctxt: BoundaryContext): Directive =
throw new UnsupportedOperationException("BUG: Cannot pull the downstream boundary")
override def onUpstreamFinish(ctxt: BoundaryContext): Directive = {
complete()
ctxt.finish()
}
override def onFailure(cause: Throwable, ctxt: BoundaryContext): Directive = {
fail(cause)
ctxt.fail(cause)
}
private def subscribePending(subscribers: Seq[Subscriber[Any]]): Unit =
subscribers foreach { sub
if (subscriber eq null) {
subscriber = sub
subscriber.onSubscribe(new ActorSubscription(actor, subscriber))
} else sub.onError(new IllegalStateException(s"${Logging.simpleName(this)} ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}"))
}
protected def waitingExposedPublisher: Actor.Receive = {
case ExposedPublisher(publisher)
exposedPublisher = publisher
subreceive.become(downstreamRunning)
case other
throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]")
}
protected def downstreamRunning: Actor.Receive = {
case SubscribePending
subscribePending(exposedPublisher.takePendingSubscribers())
case RequestMore(subscription, elements)
// TODO centralize overflow protection
downstreamDemand += elements
if (downstreamDemand < 0) {
// Long has overflown
val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue)
enter().finish()
fail(demandOverflowException)
} else if (upstreamWaiting) {
upstreamWaiting = false
enter().pull()
}
case Cancel(subscription)
downstreamCompleted = true
subscriber = null
exposedPublisher.shutdown(Some(new ActorPublisher.NormalShutdownException))
enter().finish()
}
}
/**
* INTERNAL API
*/
private[akka] class ActorInterpreter(val settings: MaterializerSettings, val ops: Seq[Op[_, _, _, _, _]])
extends Actor {
private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize)
private val downstream = new ActorOutputBoundary(self)
private val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream)
interpreter.init()
def receive: Receive = upstream.subreceive orElse downstream.subreceive
override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
super.aroundReceive(receive, msg)
if (interpreter.isFinished) context.stop(self)
}
override def postStop(): Unit = {
upstream.cancel()
downstream.fail(new IllegalStateException("Processor actor terminated abruptly"))
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
throw new IllegalStateException("This actor cannot be restarted", reason)
}
}

View file

@ -0,0 +1,511 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.fusing
import scala.annotation.tailrec
import scala.util.control.NonFatal
// TODO:
// fix jumpback table with keep-going-on-complete ops (we might jump between otherwise isolated execution regions)
// implement grouped, buffer
// add recover
trait Op[In, Out, PushD <: Directive, PullD <: Directive, Ctxt <: Context[Out]] {
private[fusing] var holding = false
private[fusing] var allowedToPush = false
private[fusing] var terminationPending = false
def isHolding: Boolean = holding
def isFinishing: Boolean = terminationPending
def onPush(elem: In, ctxt: Ctxt): PushD
def onPull(ctxt: Ctxt): PullD
def onUpstreamFinish(ctxt: Ctxt): Directive = ctxt.finish()
def onDownstreamFinish(ctxt: Ctxt): Directive = ctxt.finish()
def onFailure(cause: Throwable, ctxt: Ctxt): Directive = ctxt.fail(cause)
}
trait DeterministicOp[In, Out] extends Op[In, Out, Directive, Directive, Context[Out]]
trait DetachedOp[In, Out] extends Op[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out]]
trait BoundaryOp extends Op[Any, Any, Directive, Directive, BoundaryContext] {
private[fusing] var bctxt: BoundaryContext = _
def enter(): BoundaryContext = bctxt
}
trait TransitivePullOp[In, Out] extends DeterministicOp[In, Out] {
final override def onPull(ctxt: Context[Out]): Directive = ctxt.pull()
}
sealed trait Directive
sealed trait UpstreamDirective extends Directive
sealed trait DownstreamDirective extends Directive
sealed trait TerminationDirective extends Directive
final class FreeDirective extends UpstreamDirective with DownstreamDirective with TerminationDirective
sealed trait Context[Out] {
def push(elem: Out): DownstreamDirective
def pull(): UpstreamDirective
def finish(): FreeDirective
def pushAndFinish(elem: Out): DownstreamDirective
def fail(cause: Throwable): FreeDirective
def absorbTermination(): TerminationDirective
}
trait DetachedContext[Out] extends Context[Out] {
def hold(): FreeDirective
def pushAndPull(elem: Out): FreeDirective
}
trait BoundaryContext extends Context[Any] {
def exit(): FreeDirective
}
object OneBoundedInterpreter {
final val PhantomDirective = null
/**
* INTERNAL API
*
* This artificial op is used as a boundary to prevent two forked paths of execution (complete, cancel) to cross
* paths again. When finishing an op this op is injected in its place to isolate upstream and downstream execution
* domains.
*/
private[akka] object Finished extends BoundaryOp {
override def onPush(elem: Any, ctxt: BoundaryContext): UpstreamDirective = ctxt.finish()
override def onPull(ctxt: BoundaryContext): DownstreamDirective = ctxt.finish()
override def onUpstreamFinish(ctxt: BoundaryContext): Directive = ctxt.exit()
override def onDownstreamFinish(ctxt: BoundaryContext): Directive = ctxt.exit()
override def onFailure(cause: Throwable, ctxt: BoundaryContext): Directive = ctxt.exit()
}
}
/**
* One-bounded interpreter for a linear chain of stream operations (graph support is possible and will be implemented
* later)
*
* The ideas in this interpreter are an amalgamation of earlier ideas, notably:
* - The original effect-tracking implementation by Johannes Rudolph -- the difference here that effects are not chained
* together as classes but the callstack is used instead and only certain combinations are allowed.
* - The on-stack reentrant implementation by Mathias Doenitz -- the difference here that reentrancy is handled by the
* interpreter itself, not user code, and the interpreter is able to use the heap when needed instead of the
* callstack.
* - The pinball interpreter by Endre Sándor Varga -- the difference here that the restricition for "one ball" is
* lifted by using isolated execution regions, completion handling is introduced and communication with the external
* world is done via boundary ops.
*
* The design goals/features of this interpreter are:
* - bounded callstack and heapless execution whenever possible
* - callstack usage should be constant for the most common ops independently of the size of the op-chain
* - allocation-free execution on the hot paths
* - enforced backpressure-safety (boundedness) on user defined ops at compile-time (and runtime in a few cases)
*
* The main driving idea of this interpreter is the concept of 1-bounded execution of well-formed free choice Petri
* nets (J. Desel and J. Esparza: Free Choice Petri Nets - https://www7.in.tum.de/~esparza/bookfc.html). Technically
* different kinds of operations partition the chain of ops into regions where *exactly one* event is active all the
* time. This "exactly one" property is enforced by proper types and runtime checks where needed. Currently there are
* three kinds of ops:
*
* - DeterministicOp implementations participate in 1-bounded regions. For every external non-completion signal these
* ops produce *exactly one* signal (completion is different, explained later) therefore keeping the number of events
* the same: exactly one.
*
* - DetachedOp implementations are boundaries between 1-bounded regions. This means that they need to enforce the
* "exactly one" property both on their upstream and downstream regions. As a consequence a DetachedOp can never
* answer an onPull with a ctxt.pull() or answer an onPush() with a ctxt.push() since such an action would "steal"
* the event from one region (resulting in zero signals) and would inject it to the other region (resulting in two
* signals). However DetachedOps have the ability to call ctxt.hold() as a response to onPush/onPull which temporarily
* takes the signal off and stops execution, at the same time putting the op in a "holding" state. If the op is in a
* holding state it contains one absorbed signal, therefore in this state the only possible command to call is
* ctxt.pushAndPull() which results in two events making the balance right again:
* 1 hold + 1 external event = 2 external event
* This mechanism allows synchronization between the upstream and downstream regions which otherwise can progress
* independently.
*
* - BoundaryOp implementations are meant to communicate with the external world. These ops do not have most of the
* safety properties enforced and should be used carefully. One important ability of BoundaryOps that they can take
* off an execution signal by calling ctxt.exit(). This is typically used immediately after an external signal has
* been produced (for example an actor message). BoundaryOps can also kickstart execution by calling enter() which
* returns a context they can use to inject signals into the interpreter. There is no checks in place to enforce that
* the number of signals taken out by exit() and the number of signals returned via enter() are the same -- using this
* op type needs extra care from the implementer.
* BoundaryOps are the elements that make the interpreter *tick*, there is no other way to start the interpreter
* than using a BoundaryOp.
*
* Operations are allowed to do early completion and cancel/complete their upstreams and downstreams. It is *not*
* allowed however to do these independently to avoid isolated execution islands. The only call possible is ctxt.finish()
* which is a combination of cancel/complete.
* Since onComplete is not a backpressured signal it is sometimes preferable to push a final element and then immediately
* finish. This combination is exposed as pushAndFinish() which enables op writers to propagate completion events without
* waiting for an extra round of pull.
* Another peculiarity is how to convert termination events (complete/failure) into elements. The problem
* here is that the termination events are not backpressured while elements are. This means that simply calling ctxt.push()
* as a response to onUpstreamFinished() will very likely break boundedness and result in a buffer overflow somewhere.
* Therefore the only allowed command in this case is ctxt.absorbTermination() which stops the propagation of the
* termination signal, and puts the op in a finishing state. Depending on whether the op has a pending pull signal it has
* not yet "consumed" by a push its onPull() handler might be called immediately.
*
* In order to execute different individual execution regions the interpreter uses the callstack to schedule these. The
* current execution forking operations are
* - ctxt.finish() which starts a wave of completion and cancellation in two directions. When an op calls finish()
* it is immediately replaced by an artificial Finished op which makes sure that the two execution paths are isolated
* forever.
* - ctxt.fail() which is similar to finish()
* - ctxt.pushAndPull() which (as a response to a previous ctxt.hold()) starts a wawe of downstream push and upstream
* pull. The two execution paths are isolated by the op itself since onPull() from downstream can only answered by hold or
* push, while onPush() from upstream can only answered by hold or pull -- it is impossible to "cross" the op.
* - ctxt.pushAndFinish() which is different from the forking ops above because the execution of push and finish happens on
* the same execution region and they are order dependent, too.
* The interpreter tracks the depth of recursive forking and allows various strategies of dealing with the situation
* when this depth reaches a certain limit. In the simplest case an error is reported (this is very useful for stress
* testing and finding callstack wasting bugs), in the other case the forked call is scheduled via a list -- i.e. instead
* of the stack the heap is used.
*/
class OneBoundedInterpreter(ops: Seq[Op[_, _, _, _, _]], val forkLimit: Int = 100, val overflowToHeap: Boolean = true) {
import OneBoundedInterpreter._
type UntypedOp = Op[Any, Any, Directive, Directive, DetachedContext[Any]]
require(ops.nonEmpty, "OneBoundedInterpreter cannot be created without at least one Op")
private val pipeline = ops.toArray.asInstanceOf[Array[UntypedOp]]
/**
* This table is used to accelerate demand propagation upstream. All ops that implement TransitivePullOp are guaranteed
* to only do upstream propagation of demand signals, therefore it is not necessary to execute them but enough to
* "jump over" them. This means that when a chain of one million maps gets a downstream demand it is propagated
* to the upstream *in one step* instead of one million onPull() calls.
* This table maintains the positions where execution should jump from a current position when a pull event is to
* be executed.
*/
private val jumpBacks: Array[Int] = calculateJumpBacks
private val Upstream = 0
private val Downstream = pipeline.length - 1
// Var to hold the current element if pushing. The only reason why this var is needed is to avoid allocations and
// make it possible for the Pushing state to be an object
private var elementInFlight: Any = _
// Points to the current point of execution inside the pipeline
private var activeOp = -1
// The current interpreter state that decides what happens at the next round
private var state: State = Pushing
// Counter that keeps track of the depth of recursive forked executions
private var forkCount = 0
// List that is used as an auxiliary stack if fork recursion depth reaches forkLimit
private var overflowStack = List.empty[(Int, State, Any)]
// see the jumpBacks variable for explanation
private def calculateJumpBacks: Array[Int] = {
val table = Array.ofDim[Int](pipeline.length)
var nextJumpBack = -1
for (pos 0 until pipeline.length) {
table(pos) = nextJumpBack
if (!pipeline(pos).isInstanceOf[TransitivePullOp[_, _]]) nextJumpBack = pos
}
table
}
private sealed trait State extends DetachedContext[Any] with BoundaryContext {
def advance(): Unit
override def push(elem: Any): DownstreamDirective = {
if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot push while holding, only pushAndPull")
pipeline(activeOp).allowedToPush = false
elementInFlight = elem
state = Pushing
PhantomDirective
}
override def pull(): UpstreamDirective = {
if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot pull while holding, only pushAndPull")
pipeline(activeOp).allowedToPush = !pipeline(activeOp).isInstanceOf[DetachedOp[_, _]]
state = Pulling
PhantomDirective
}
override def finish(): FreeDirective = {
fork(Completing)
state = Cancelling
PhantomDirective
}
override def pushAndFinish(elem: Any): DownstreamDirective = {
pipeline(activeOp) = Finished.asInstanceOf[UntypedOp]
// This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution
// path. Other forks are not order dependent because they execute on isolated execution domains which cannot
// "cross paths". This unsafeFork is relatively safe here because PushAndFinish simply absorbs all later downstream
// calls of pushAndFinish since the finish event has been scheduled already.
// It might be that there are some degenerate cases where this can blow up the stack with a very long chain but I
// am not aware of such scenario yet. If you know one, put it in InterpreterStressSpec :)
unsafeFork(PushFinish, elem)
elementInFlight = null
finish()
}
override def fail(cause: Throwable): FreeDirective = {
fork(Failing(cause))
state = Cancelling
PhantomDirective
}
override def hold(): FreeDirective = {
if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot hold while already holding")
pipeline(activeOp).holding = true
exit()
}
override def pushAndPull(elem: Any): FreeDirective = {
if (!pipeline(activeOp).holding) throw new IllegalStateException("Cannot pushAndPull without holding first")
pipeline(activeOp).holding = false
fork(Pushing, elem)
state = Pulling
PhantomDirective
}
override def absorbTermination(): TerminationDirective = {
pipeline(activeOp).holding = false
finish()
}
override def exit(): FreeDirective = {
elementInFlight = null
activeOp = -1
PhantomDirective
}
}
private object Pushing extends State {
override def advance(): Unit = {
activeOp += 1
pipeline(activeOp).onPush(elementInFlight, ctxt = this)
}
}
private object PushFinish extends State {
override def advance(): Unit = {
activeOp += 1
pipeline(activeOp).onPush(elementInFlight, ctxt = this)
}
override def pushAndFinish(elem: Any): DownstreamDirective = {
elementInFlight = elem
state = PushFinish
PhantomDirective
}
override def finish(): FreeDirective = {
state = Completing
PhantomDirective
}
}
private object Pulling extends State {
override def advance(): Unit = {
elementInFlight = null
activeOp = jumpBacks(activeOp)
pipeline(activeOp).onPull(ctxt = this)
}
override def hold(): FreeDirective = {
super.hold()
pipeline(activeOp).allowedToPush = true
PhantomDirective
}
}
private object Completing extends State {
override def advance(): Unit = {
elementInFlight = null
pipeline(activeOp) = Finished.asInstanceOf[UntypedOp]
activeOp += 1
if (!pipeline(activeOp).isFinishing) pipeline(activeOp).onUpstreamFinish(ctxt = this)
else exit()
}
override def finish(): FreeDirective = {
state = Completing
PhantomDirective
}
override def absorbTermination(): TerminationDirective = {
pipeline(activeOp).terminationPending = true
pipeline(activeOp).holding = false
// FIXME: This state is potentially corrupted by the jumpBackTable (not updated when jumping over)
if (pipeline(activeOp).allowedToPush) pipeline(activeOp).onPull(ctxt = Pulling)
else exit()
PhantomDirective
}
}
private object Cancelling extends State {
override def advance(): Unit = {
elementInFlight = null
pipeline(activeOp) = Finished.asInstanceOf[UntypedOp]
activeOp -= 1
if (!pipeline(activeOp).isFinishing) pipeline(activeOp).onDownstreamFinish(ctxt = this)
else exit()
}
override def finish(): FreeDirective = {
state = Cancelling
PhantomDirective
}
}
private final case class Failing(cause: Throwable) extends State {
override def advance(): Unit = {
elementInFlight = null
pipeline(activeOp) = Finished.asInstanceOf[UntypedOp]
activeOp += 1
pipeline(activeOp).onFailure(cause, ctxt = this)
}
override def absorbTermination(): TerminationDirective = {
pipeline(activeOp).terminationPending = true
pipeline(activeOp).holding = false
if (pipeline(activeOp).allowedToPush) pipeline(activeOp).onPull(ctxt = Pulling)
else exit()
PhantomDirective
}
}
@tailrec private def execute(): Unit = {
while (activeOp > -1 && activeOp < pipeline.length) {
try {
state.advance()
} catch {
case NonFatal(e)
try {
state.fail(e)
} catch {
case NonFatal(_)
// TODO: Make pipeline all failed
throw new IllegalStateException("Double Fault: Failure while handling failure", e)
}
}
}
// Execute all delayed forks that were put on the heap if the fork limit has been reached
if (overflowStack.nonEmpty) {
val memo = overflowStack.head
activeOp = memo._1
state = memo._2
elementInFlight = memo._3
overflowStack = overflowStack.tail
execute()
}
}
/**
* Forks off execution of the pipeline by saving current position, fully executing the effects of the given
* forkState then setting back the position to the saved value.
* By default forking is executed by using the callstack. If the depth of forking ever reaches the configured forkLimit
* this method either fails (useful for testing) or starts using the heap instead of the callstack to avoid a
* stack overflow.
*/
private def fork(forkState: State, elem: Any = null): Unit = {
forkCount += 1
if (forkCount == forkLimit) {
if (!overflowToHeap) throw new IllegalStateException("Fork limit reached")
else overflowStack ::= ((activeOp, forkState, elem))
} else unsafeFork(forkState, elem)
forkCount -= 1
}
/**
* Unsafe fork always uses the stack for execution. This call is needed by pushAndComplete where the forked execution
* is order dependent since the push and complete events travel in the same direction and not isolated by a boundary
*/
private def unsafeFork(forkState: State, elem: Any = null): Unit = {
val savePos = activeOp
elementInFlight = elem
state = forkState
execute()
activeOp = savePos
PhantomDirective
}
def init(): Unit = {
initBoundaries()
runDetached()
}
def isFinished: Boolean = pipeline(Upstream) == Finished && pipeline(Downstream) == Finished
/**
* This method injects a Context to each of the BoundaryOps. This will be the context returned by enter().
*/
private def initBoundaries(): Unit = {
var op = 0
while (op < pipeline.length) {
if (pipeline(op).isInstanceOf[BoundaryOp]) {
pipeline(op).asInstanceOf[BoundaryOp].bctxt = new State {
val entryPoint = op
override def advance(): Unit = ()
override def push(elem: Any): DownstreamDirective = {
activeOp = entryPoint
super.push(elem)
execute()
PhantomDirective
}
override def pull(): UpstreamDirective = {
activeOp = entryPoint
super.pull()
execute()
PhantomDirective
}
override def finish(): FreeDirective = {
activeOp = entryPoint
super.finish()
execute()
PhantomDirective
}
override def fail(cause: Throwable): FreeDirective = {
activeOp = entryPoint
super.fail(cause)
execute()
PhantomDirective
}
override def hold(): FreeDirective = {
activeOp = entryPoint
super.hold()
execute()
PhantomDirective
}
override def pushAndPull(elem: Any): FreeDirective = {
activeOp = entryPoint
super.pushAndPull(elem)
execute()
PhantomDirective
}
}
}
op += 1
}
}
/**
* Starts execution of detached regions.
*
* Since detached ops partition the pipeline into different 1-bounded domains is is necessary to inject a starting
* signal into these regions (since there is no external signal that would kick off their execution otherwise).
*/
private def runDetached(): Unit = {
var op = pipeline.length - 1
while (op >= 0) {
if (pipeline(op).isInstanceOf[DetachedOp[_, _]]) {
activeOp = op
state = Pulling
execute()
}
op -= 1
}
}
}

View file

@ -0,0 +1,240 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.fusing
import akka.stream.OverflowStrategy
import akka.stream.impl.FixedSizeBuffer
import scala.collection.immutable
/**
* INTERNAL API
*/
private[akka] case class Map[In, Out](f: In Out) extends TransitivePullOp[In, Out] {
override def onPush(elem: In, ctxt: Context[Out]): Directive = ctxt.push(f(elem))
}
/**
* INTERNAL API
*/
private[akka] case class Filter[T](p: T Boolean) extends TransitivePullOp[T, T] {
override def onPush(elem: T, ctxt: Context[T]): Directive =
if (p(elem)) ctxt.push(elem)
else ctxt.pull()
}
/**
* INTERNAL API
*/
private[akka] case class MapConcat[In, Out](f: In immutable.Seq[Out]) extends DeterministicOp[In, Out] {
private var currentIterator: Iterator[Out] = Iterator.empty
override def onPush(elem: In, ctxt: Context[Out]): Directive = {
currentIterator = f(elem).iterator
if (currentIterator.isEmpty) ctxt.pull()
else ctxt.push(currentIterator.next())
}
override def onPull(ctxt: Context[Out]): Directive =
if (currentIterator.hasNext) ctxt.push(currentIterator.next())
else ctxt.pull()
}
/**
* INTERNAL API
*/
private[akka] case class Take[T](count: Int) extends TransitivePullOp[T, T] {
private var left: Int = count
override def onPush(elem: T, ctxt: Context[T]): Directive = {
left -= 1
if (left == 0) ctxt.pushAndFinish(elem)
else ctxt.push(elem)
}
}
/**
* INTERNAL API
*/
private[akka] case class Drop[T](count: Int) extends TransitivePullOp[T, T] {
private var left: Int = count
override def onPush(elem: T, ctxt: Context[T]): Directive =
if (left > 0) {
left -= 1
ctxt.pull()
} else ctxt.push(elem)
}
/**
* INTERNAL API
*/
private[akka] case class Fold[In, Out](zero: Out, f: (Out, In) Out) extends DeterministicOp[In, Out] {
private var aggregator = zero
override def onPush(elem: In, ctxt: Context[Out]): Directive = {
aggregator = f(aggregator, elem)
ctxt.pull()
}
override def onPull(ctxt: Context[Out]): Directive =
if (isFinishing) ctxt.pushAndFinish(aggregator)
else ctxt.pull()
override def onUpstreamFinish(ctxt: Context[Out]): Directive = ctxt.absorbTermination()
}
/**
* INTERNAL API
*/
private[akka] case class Grouped[T](n: Int) extends DeterministicOp[T, immutable.Seq[T]] {
private var buf: Vector[T] = Vector.empty
override def onPush(elem: T, ctxt: Context[immutable.Seq[T]]): Directive = {
buf :+= elem
if (buf.size == n) {
val emit = buf
buf = Vector.empty
ctxt.push(emit)
} else ctxt.pull()
}
override def onPull(ctxt: Context[immutable.Seq[T]]): Directive =
if (isFinishing) ctxt.pushAndFinish(buf)
else ctxt.pull()
override def onUpstreamFinish(ctxt: Context[immutable.Seq[T]]): Directive =
if (buf.isEmpty) ctxt.finish()
else ctxt.absorbTermination()
}
/**
* INTERNAL API
*/
private[akka] case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedOp[T, T] {
import OverflowStrategy._
private val buffer = FixedSizeBuffer(size)
override def onPush(elem: T, ctxt: DetachedContext[T]): UpstreamDirective =
if (isHolding) ctxt.pushAndPull(elem)
else enqueueAction(ctxt, elem)
override def onPull(ctxt: DetachedContext[T]): DownstreamDirective = {
if (isFinishing) {
val elem = buffer.dequeue().asInstanceOf[T]
if (buffer.isEmpty) ctxt.pushAndFinish(elem)
else ctxt.push(elem)
} else if (isHolding) ctxt.pushAndPull(buffer.dequeue().asInstanceOf[T])
else if (buffer.isEmpty) ctxt.hold()
else ctxt.push(buffer.dequeue().asInstanceOf[T])
}
override def onUpstreamFinish(ctxt: DetachedContext[T]): Directive =
if (buffer.isEmpty) ctxt.finish()
else ctxt.absorbTermination()
val enqueueAction: (DetachedContext[T], T) UpstreamDirective = {
overflowStrategy match {
case DropHead { (ctxt, elem)
if (buffer.isFull) buffer.dropHead()
buffer.enqueue(elem)
ctxt.pull()
}
case DropTail { (ctxt, elem)
if (buffer.isFull) buffer.dropTail()
buffer.enqueue(elem)
ctxt.pull()
}
case DropBuffer { (ctxt, elem)
if (buffer.isFull) buffer.clear()
buffer.enqueue(elem)
ctxt.pull()
}
case Backpressure { (ctxt, elem)
buffer.enqueue(elem)
if (buffer.isFull) ctxt.hold()
else ctxt.pull()
}
case Error { (ctxt, elem)
if (buffer.isFull) ctxt.fail(new Error.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
else {
buffer.enqueue(elem)
ctxt.pull()
}
}
}
}
}
/**
* INTERNAL API
*/
private[akka] case class Completed[T]() extends DeterministicOp[T, T] {
override def onPush(elem: T, ctxt: Context[T]): Directive = ctxt.finish()
override def onPull(ctxt: Context[T]): Directive = ctxt.finish()
}
/**
* INTERNAL API
*/
private[akka] case class Conflate[In, Out](seed: In Out, aggregate: (Out, In) Out) extends DetachedOp[In, Out] {
private var agg: Any = null
override def onPush(elem: In, ctxt: DetachedContext[Out]): UpstreamDirective = {
if (agg == null) agg = seed(elem)
else agg = aggregate(agg.asInstanceOf[Out], elem)
if (!isHolding) ctxt.pull() else {
val result = agg.asInstanceOf[Out]
agg = null
ctxt.pushAndPull(result)
}
}
override def onPull(ctxt: DetachedContext[Out]): DownstreamDirective = {
if (isFinishing) {
if (agg == null) ctxt.finish()
else {
val result = agg.asInstanceOf[Out]
agg = null
ctxt.pushAndFinish(result)
}
} else if (agg == null) ctxt.hold()
else {
val result = agg.asInstanceOf[Out]
agg = null
ctxt.push(result)
}
}
override def onUpstreamFinish(ctxt: DetachedContext[Out]): Directive = ctxt.absorbTermination()
}
/**
* INTERNAL API
*/
private[akka] case class Expand[In, Out, Seed](seed: In Seed, extrapolate: Seed (Out, Seed)) extends DetachedOp[In, Out] {
private var s: Any = null
override def onPush(elem: In, ctxt: DetachedContext[Out]): UpstreamDirective = {
s = seed(elem)
if (isHolding) {
val (emit, newS) = extrapolate(s.asInstanceOf[Seed])
s = newS
ctxt.pushAndPull(emit)
} else ctxt.hold()
}
override def onPull(ctxt: DetachedContext[Out]): DownstreamDirective = {
if (s == null) ctxt.hold()
else {
val (emit, newS) = extrapolate(s.asInstanceOf[Seed])
s = newS
if (isHolding) {
ctxt.pushAndPull(emit)
} else ctxt.push(emit)
}
}
}

View file

@ -4,6 +4,7 @@
package akka.stream.scaladsl
import akka.stream.impl.Ast._
import akka.stream.impl.fusing
import akka.stream.{ TimerTransformer, Transformer, OverflowStrategy }
import akka.util.Collections.EmptyImmutableSeq
import scala.collection.immutable
@ -101,24 +102,19 @@ trait RunnableFlow {
trait FlowOps[+Out] {
import FlowOps._
type Repr[+O]
import akka.stream.impl.fusing
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.
*/
def map[T](f: Out T): Repr[T] =
transform("map", () new Transformer[Out, T] {
def onNext(in: Out) = List(f(in))
})
def map[T](f: Out T): Repr[T] = andThen(Fusable(Vector(fusing.Map(f)), "map"))
/**
* Transform each input element into a sequence of output elements that is
* then flattened into the output stream.
*/
def mapConcat[T](f: Out immutable.Seq[T]): Repr[T] =
transform("mapConcat", () new Transformer[Out, T] {
def onNext(in: Out) = f(in)
})
def mapConcat[T](f: Out immutable.Seq[T]): Repr[T] = andThen(Fusable(Vector(fusing.MapConcat(f)), "mapConcat"))
/**
* Transform this stream by applying the given function to each of the elements
@ -148,20 +144,16 @@ trait FlowOps[+Out] {
/**
* Only pass on those elements that satisfy the given predicate.
*/
def filter(p: Out Boolean): Repr[Out] =
transform("filter", () new Transformer[Out, Out] {
def onNext(in: Out) = if (p(in)) List(in) else Nil
})
def filter(p: Out Boolean): Repr[Out] = andThen(Fusable(Vector(fusing.Filter(p)), "filter"))
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step.
* Non-matching elements are filtered out.
*/
def collect[T](pf: PartialFunction[Out, T]): Repr[T] =
transform("collect", () new Transformer[Out, T] {
def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil
})
def collect[T](pf: PartialFunction[Out, T]): Repr[T] = andThen(Fusable(Vector(
fusing.Filter(pf.isDefinedAt),
fusing.Map(pf.apply)), "filter"))
/**
* Chunk up this stream into groups of the given size, with the last group
@ -171,19 +163,7 @@ trait FlowOps[+Out] {
*/
def grouped(n: Int): Repr[immutable.Seq[Out]] = {
require(n > 0, "n must be greater than 0")
transform("grouped", () new Transformer[Out, immutable.Seq[Out]] {
var buf: Vector[Out] = Vector.empty
def onNext(in: Out) = {
buf :+= in
if (buf.size == n) {
val group = buf
buf = Vector.empty
List(group)
} else
Nil
}
override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf)
})
andThen(Fusable(Vector(fusing.Grouped(n)), "grouped"))
}
/**
@ -228,21 +208,8 @@ trait FlowOps[+Out] {
* No elements will be dropped if `n` is zero or negative.
*/
def drop(n: Int): Repr[Out] =
transform("drop", () new Transformer[Out, Out] {
var delegate: Transformer[Out, Out] =
if (n <= 0) identityTransformer.asInstanceOf[Transformer[Out, Out]]
else new Transformer[Out, Out] {
var c = n
def onNext(in: Out) = {
c -= 1
if (c == 0)
delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]]
Nil
}
}
def onNext(in: Out) = delegate.onNext(in)
})
if (n <= 0) andThen(Fusable(Vector.empty, "drop"))
else andThen(Fusable(Vector(fusing.Drop(n)), "drop"))
/**
* Discard the elements received within the given duration at beginning of the stream.
@ -273,22 +240,8 @@ trait FlowOps[+Out] {
* or negative.
*/
def take(n: Int): Repr[Out] =
transform("take", () new Transformer[Out, Out] {
var delegate: Transformer[Out, Out] =
if (n <= 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
else new Transformer[Out, Out] {
var c = n
def onNext(in: Out) = {
c -= 1
if (c == 0)
delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
List(in)
}
}
def onNext(in: Out) = delegate.onNext(in)
override def isComplete = delegate.isComplete
})
if (n <= 0) andThen(Fusable(Vector(fusing.Completed()), "take"))
else andThen(Fusable(Vector(fusing.Take(n)), "take"))
/**
* Terminate processing (and cancel the upstream publisher) after the given
@ -325,7 +278,7 @@ trait FlowOps[+Out] {
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
def conflate[S](seed: Out S)(aggregate: (S, Out) S): Repr[S] =
andThen(Conflate(seed.asInstanceOf[Any Any], aggregate.asInstanceOf[(Any, Any) Any]))
andThen(Fusable(Vector(fusing.Conflate(seed, aggregate)), "conflate"))
/**
* Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older
@ -341,7 +294,7 @@ trait FlowOps[+Out] {
* state.
*/
def expand[S, U](seed: Out S, extrapolate: S (U, S)): Repr[U] =
andThen(Expand(seed.asInstanceOf[Any Any], extrapolate.asInstanceOf[Any (Any, Any)]))
andThen(Fusable(Vector(fusing.Expand(seed, extrapolate)), "expand"))
/**
* Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
@ -353,7 +306,7 @@ trait FlowOps[+Out] {
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = {
require(size > 0, s"Buffer size must be larger than zero but was [$size]")
andThen(Buffer(size, overflowStrategy))
andThen(Fusable(Vector(fusing.Buffer(size, overflowStrategy)), "buffer"))
}
/**