!str Implementation of multi-stream operations

- groupBy and splitWhen
 - static fan-ins: merge, concat, zip
 - factored out input and output conditions
 - factored out side-stream management logic
This commit is contained in:
Roland Kuhn 2014-03-30 09:27:19 +02:00
parent 2ab574bab6
commit 429f68e9d9
15 changed files with 1076 additions and 93 deletions

View file

@ -44,6 +44,14 @@ trait Stream[T] {
f: (S, Try[T]) (S, immutable.Seq[U]),
onComplete: S immutable.Seq[U] = (_: S) Nil,
isComplete: S Boolean = (_: S) false): Stream[U]
def groupBy[K](f: T K): Stream[(K, Producer[T])]
def splitWhen(p: T Boolean): Stream[Producer[T]]
def merge(other: Producer[T]): Stream[T]
def zip[U](other: Producer[U]): Stream[(T, U)]
def concat(next: Producer[T]): Stream[T]
def toFuture(generator: ProcessorGenerator): Future[T]
def consume(generator: ProcessorGenerator): Unit
def toProducer(generator: ProcessorGenerator): Producer[T]

View file

@ -3,12 +3,11 @@
*/
package akka.stream.impl
import java.util.Arrays
import scala.collection.immutable
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
import org.reactivestreams.api.Processor
import org.reactivestreams.spi.{ Subscriber, Subscription }
import org.reactivestreams.spi.Subscriber
import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import akka.stream.GeneratorSettings
import akka.event.LoggingReceive
@ -21,6 +20,11 @@ private[akka] object ActorProcessor {
def props(settings: GeneratorSettings, op: AstNode): Props = op match {
case t: Transform Props(new TransformProcessorImpl(settings, t))
case r: Recover Props(new RecoverProcessorImpl(settings, r))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case m: Merge Props(new MergeImpl(settings, m.other))
case z: Zip Props(new ZipImpl(settings, z.other))
case c: Concat Props(new ConcatImpl(settings, c.other))
}
}
@ -30,7 +34,6 @@ class ActorProcessor[I, O]( final val impl: ActorRef) extends Processor[I, O] wi
* INTERNAL API
*/
private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) extends Actor with SubscriberManagement[Any] with ActorLogging {
import ActorProcessor._
type S = ActorSubscription[Any]
override def maxBufferSize: Int = settings.maxFanOutBufferSize
@ -39,9 +42,10 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
override def receive = waitingExposedPublisher
protected var primaryInputs: Inputs = _
////////////////////// Startup phases //////////////////////
var upstream: Subscription = _
var exposedPublisher: ActorPublisher[Any] = _
def waitingExposedPublisher: Receive = {
@ -52,20 +56,27 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
}
def waitingForUpstream: Receive = downstreamManagement orElse {
case OnComplete shutdown(completed = true) // There is nothing to flush here
case OnComplete
// Instead of introducing an edge case, handle it in the general way
primaryInputs = EmptyInputs
transitionToRunningWhenReady()
case OnSubscribe(subscription)
assert(subscription != null)
upstream = subscription
// Prime up input buffer
upstream.requestMore(inputBuffer.length)
context.become(running)
primaryInputs = new BatchingInputBuffer(subscription, settings.initialInputBufferSize)
transitionToRunningWhenReady()
case OnError(cause) failureReceived(cause)
}
def transitionToRunningWhenReady(): Unit = if (primaryInputs ne null) {
primaryInputs.prefetch()
transferState = initialTransferState
context.become(running)
}
////////////////////// Management of subscribers //////////////////////
// All methods called here are implemented by SubscriberManagement
val downstreamManagement: Receive = {
def downstreamManagement: Receive = {
case SubscribePending
subscribePending()
case RequestMore(subscription, elements)
@ -83,9 +94,10 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
def running: Receive = LoggingReceive(downstreamManagement orElse {
case OnNext(element)
enqueueInputElement(element)
primaryInputs.enqueueInputElement(element)
pump()
case OnComplete
primaryInputs.complete()
flushAndComplete()
pump()
case OnError(cause) failureReceived(cause)
@ -93,12 +105,15 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
// Called by SubscriberManagement when all subscribers are gone.
// The method shutdown() is called automatically by SubscriberManagement after it called this method.
override def cancelUpstream(): Unit = if (upstream ne null) upstream.cancel()
override def cancelUpstream(): Unit = {
if (primaryInputs ne null) primaryInputs.cancel()
PrimaryOutputs.cancel()
}
// Called by SubscriberManagement whenever the output buffer is ready to accept additional elements
override protected def requestFromUpstream(elements: Int): Unit = {
log.debug(s"received downstream demand from buffer: $elements")
downstreamBufferSpace += elements
PrimaryOutputs.enqueueOutputDemand(elements)
}
def failureReceived(e: Throwable): Unit = fail(e)
@ -107,80 +122,45 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
shutdownReason = Some(e)
log.error(e, "failure during processing") // FIXME: escalate to supervisor instead
abortDownstream(e)
if (upstream ne null) upstream.cancel()
if (primaryInputs ne null) primaryInputs.cancel()
context.stop(self)
}
private var downstreamBufferSpace = 0
private var inputBuffer = Array.ofDim[AnyRef](settings.initialInputBufferSize)
private var inputBufferElements = 0
private var nextInputElementCursor = 0
val IndexMask = settings.initialInputBufferSize - 1
object PrimaryOutputs extends Outputs {
private var downstreamBufferSpace = 0
private var downstreamCompleted = false
def demandAvailable = downstreamBufferSpace > 0
// TODO: buffer and batch sizing heuristics
def requestBatchSize = math.max(1, inputBuffer.length / 2)
private var batchRemaining = requestBatchSize
def dequeueInputElement(): Any = {
val elem = inputBuffer(nextInputElementCursor)
inputBuffer(nextInputElementCursor) = null
batchRemaining -= 1
if (batchRemaining == 0 && !upstreamCompleted) {
upstream.requestMore(requestBatchSize)
batchRemaining = requestBatchSize
def enqueueOutputDemand(demand: Int): Unit = downstreamBufferSpace += demand
def enqueueOutputElement(elem: Any): Unit = {
downstreamBufferSpace -= 1
pushToDownstream(elem)
}
inputBufferElements -= 1
nextInputElementCursor += 1
nextInputElementCursor &= IndexMask
elem
def complete(): Unit = downstreamCompleted = true
def cancel(): Unit = downstreamCompleted = true
def isComplete: Boolean = downstreamCompleted
override val NeedsDemand: TransferState = new TransferState {
def isReady = demandAvailable
def isCompleted = downstreamCompleted
}
override def NeedsDemandOrCancel: TransferState = new TransferState {
def isReady = demandAvailable || downstreamCompleted
def isCompleted = false
}
}
def enqueueInputElement(elem: Any): Unit = {
inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef]
inputBufferElements += 1
}
def needsPrimaryInputAndDemand = primaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand
def enqueueOutputElement(elem: Any): Unit = {
downstreamBufferSpace -= 1
pushToDownstream(elem)
}
// States of the operation that is executed by this processor
trait TransferState {
protected def isReady: Boolean
def isCompleted: Boolean
def isExecutable = isReady && !isCompleted
def inputsAvailable = inputBufferElements > 0
def demandAvailable = downstreamBufferSpace > 0
def inputsDepleted = upstreamCompleted && inputBufferElements == 0
}
object NeedsInput extends TransferState {
def isReady = inputsAvailable || inputsDepleted
def isCompleted = false
}
object NeedsDemand extends TransferState {
def isReady = demandAvailable
def isCompleted = false
}
object NeedsInputAndDemand extends TransferState {
def isReady = inputsAvailable && demandAvailable || inputsDepleted
def isCompleted = false
}
object Completed extends TransferState {
def isReady = false
def isCompleted = true
}
var transferState: TransferState = NeedsInputAndDemand
var transferState: TransferState = _
protected def initialTransferState: TransferState
// Exchange input buffer elements and output buffer "requests" until one of them becomes empty.
// Generate upstream requestMore for every Nth consumed input element
protected def pump(): Unit = {
try while (transferState.isExecutable) {
transferState = transfer()
log.debug(s"iterating the pump with state $transferState and buffer $bufferDebug")
transferState = transfer(transferState)
} catch { case NonFatal(e) fail(e) }
log.debug(s"finished iterating the pump with state $transferState and buffer $bufferDebug")
@ -188,9 +168,8 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
if (transferState.isCompleted) {
if (!isShuttingDown) {
log.debug("shutting down the pump")
if (!upstreamCompleted) upstream.cancel()
Arrays.fill(inputBuffer, 0, inputBuffer.length, null)
inputBufferElements = 0
if (!primaryInputs.isCompleted) primaryInputs.cancel()
primaryInputs.clear()
context.become(flushing)
isShuttingDown = true
}
@ -200,16 +179,11 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
// Needs to be implemented by Processor implementations. Transfers elements from the input buffer to the output
// buffer.
protected def transfer(current: TransferState): TransferState
protected def transfer(): TransferState
////////////////////// Completing and Flushing //////////////////////
var upstreamCompleted = false
protected def flushAndComplete(): Unit = {
upstreamCompleted = true
context.become(flushing)
}
protected def flushAndComplete(): Unit = context.become(flushing)
def flushing: Receive = downstreamManagement orElse {
case OnSubscribe(subscription) throw new IllegalStateException("Cannot subscribe shutdown subscriber")
@ -219,6 +193,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
////////////////////// Shutdown and cleanup (graceful and abort) //////////////////////
var isShuttingDown = false
var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason
// Called by SubscriberManagement to signal that output buffer finished (flushed or aborted)
@ -226,6 +201,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
isShuttingDown = true
if (completed)
shutdownReason = None
PrimaryOutputs.complete()
context.stop(self)
}
@ -257,27 +233,34 @@ private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast
// TODO performance improvement: mutable buffer?
var emits = immutable.Seq.empty[Any]
override def transfer(current: TransferState): TransferState = {
val depleted = current.inputsDepleted
object NeedsInputAndDemandOrCompletion extends TransferState {
def isReady = primaryInputs.inputsAvailable && PrimaryOutputs.demandAvailable || primaryInputs.inputsDepleted
def isCompleted = false
}
override def initialTransferState = NeedsInputAndDemandOrCompletion
override def transfer(): TransferState = {
val depleted = primaryInputs.inputsDepleted
if (emits.isEmpty) {
isComplete = op.isComplete(state)
if (depleted || isComplete) {
emits = op.onComplete(state)
hasOnCompleteRun = true
} else {
val e = dequeueInputElement()
val e = primaryInputs.dequeueInputElement()
val (nextState, newEmits) = op.f(state, e)
state = nextState
emits = newEmits
}
} else {
enqueueOutputElement(emits.head)
PrimaryOutputs.enqueueOutputElement(emits.head)
emits = emits.tail
}
if (emits.nonEmpty) NeedsDemand
if (emits.nonEmpty) PrimaryOutputs.NeedsDemand
else if (hasOnCompleteRun) Completed
else NeedsInputAndDemand
else NeedsInputAndDemandOrCompletion
}
override def toString: String = s"Transformer(state=$state, isComplete=$isComplete, hasOnCompleteRun=$hasOnCompleteRun, emits=$emits)"
@ -287,12 +270,39 @@ private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast
* INTERNAL API
*/
private[akka] class RecoverProcessorImpl(_settings: GeneratorSettings, _op: Ast.Recover) extends TransformProcessorImpl(_settings, _op.t) {
override def enqueueInputElement(elem: Any): Unit = {
super.enqueueInputElement(Success(elem))
val WrapInSuccess: Receive = {
case OnNext(elem)
primaryInputs.enqueueInputElement(Success(elem))
pump()
}
override def running: Receive = WrapInSuccess orElse super.running
override def failureReceived(e: Throwable): Unit = {
super.enqueueInputElement(Failure(e))
primaryInputs.enqueueInputElement(Failure(e))
primaryInputs.complete()
flushAndComplete()
pump()
}
}
/**
* INTERNAL API
*/
private[akka] object IdentityProcessorImpl {
def props(settings: GeneratorSettings): Props = Props(new IdentityProcessorImpl(settings))
}
/**
* INTERNAL API
*/
private[akka] class IdentityProcessorImpl(_settings: GeneratorSettings) extends ActorProcessorImpl(_settings) {
override def initialTransferState = needsPrimaryInputAndDemand
override protected def transfer(): TransferState = {
PrimaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement())
needsPrimaryInputAndDemand
}
}

View file

@ -0,0 +1,80 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import org.reactivestreams.spi.Subscription
import akka.actor.{ Terminated, Props, ActorRef }
import akka.stream.{ Stream, GeneratorSettings }
import akka.stream.impl._
/**
* INTERNAL API
*/
private[akka] object GroupByProcessorImpl {
trait SubstreamElementState
case object NoPending extends SubstreamElementState
case class PendingElement(elem: Any, key: Any) extends SubstreamElementState
case class PendingElementForNewStream(elem: Any, key: Any) extends SubstreamElementState
}
/**
* INTERNAL API
*/
private[akka] class GroupByProcessorImpl(settings: GeneratorSettings, val keyFor: Any Any) extends MultiStreamOutputProcessor(settings) {
import GroupByProcessorImpl._
var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs]
var substreamPendingState: SubstreamElementState = NoPending
def substreamsFinished: Boolean = keyToSubstreamOutputs.isEmpty
override def initialTransferState = needsPrimaryInputAndDemand
override def transfer(): TransferState = substreamPendingState match {
case PendingElementForNewStream(elem, key)
if (PrimaryOutputs.isComplete) {
substreamPendingState = NoPending
// Just drop, we do not open any more substreams
primaryInputs.NeedsInput
} else {
val substreamOutput = newSubstream()
pushToDownstream((key, substreamOutput.processor))
keyToSubstreamOutputs(key) = substreamOutput
substreamPendingState = PendingElement(elem, key)
substreamOutput.NeedsDemand
}
case PendingElement(elem, key)
if (!keyToSubstreamOutputs(key).isComplete) keyToSubstreamOutputs(key).enqueueOutputElement(elem)
substreamPendingState = NoPending
primaryInputs.NeedsInput
case NoPending
val elem = primaryInputs.dequeueInputElement()
val key = keyFor(elem)
if (keyToSubstreamOutputs.contains(key)) {
substreamPendingState = PendingElement(elem, key)
keyToSubstreamOutputs(key).NeedsDemand
} else if (!PrimaryOutputs.isComplete) {
substreamPendingState = PendingElementForNewStream(elem, key)
PrimaryOutputs.NeedsDemand
} else primaryInputs.NeedsInput
}
override def invalidateSubstream(substream: ActorRef): Unit = {
substreamPendingState match {
case PendingElement(_, key) if keyToSubstreamOutputs(key).substream == substream
transferState = primaryInputs.NeedsInput
substreamPendingState = NoPending
case PendingElementForNewStream(_, key) if keyToSubstreamOutputs(key).substream == substream
transferState = primaryInputs.NeedsInput
substreamPendingState = NoPending
case _
}
super.invalidateSubstream(substream)
}
}

View file

@ -20,6 +20,11 @@ private[akka] object Ast {
case class Transform(zero: Any, f: (Any, Any) (Any, immutable.Seq[Any]), onComplete: Any immutable.Seq[Any], isComplete: Any Boolean) extends AstNode
case class Recover(t: Transform) extends AstNode
case class GroupBy(f: Any Any) extends AstNode
case class SplitWhen(p: Any Boolean) extends AstNode
case class Merge(other: Producer[Any]) extends AstNode
case class Zip(other: Producer[Any]) extends AstNode
case class Concat(other: Producer[Any]) extends AstNode
trait ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I]

View file

@ -0,0 +1,73 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.actor.{ Props, ActorRef }
import org.reactivestreams.spi.Subscription
import akka.stream.impl._
import akka.stream.{ Stream, GeneratorSettings }
import akka.actor.Terminated
/**
* INTERNAL API
*/
private[akka] object SplitWhenProcessorImpl {
trait SubstreamElementState
case object NoPending extends SubstreamElementState
case class PendingElement(elem: Any) extends SubstreamElementState
case class PendingElementForNewStream(elem: Any) extends SubstreamElementState
}
/**
* INTERNAL API
*/
private[akka] class SplitWhenProcessorImpl(_settings: GeneratorSettings, val splitPredicate: Any Boolean)
extends MultiStreamOutputProcessor(_settings) {
import SplitWhenProcessorImpl._
var pendingElement: SubstreamElementState = NoPending
var started = false
var currentSubstream: SubstreamOutputs = _
override def initialTransferState = needsPrimaryInputAndDemand
override def transfer(): TransferState = pendingElement match {
case NoPending
val elem = primaryInputs.dequeueInputElement()
if (!started) {
pendingElement = PendingElementForNewStream(elem)
started = true
PrimaryOutputs.NeedsDemand
} else if (splitPredicate(elem)) {
pendingElement = PendingElementForNewStream(elem)
currentSubstream.complete()
PrimaryOutputs.NeedsDemand
} else if (!currentSubstream.isComplete) {
pendingElement = PendingElement(elem)
currentSubstream.NeedsDemand
} else primaryInputs.NeedsInput
case PendingElement(elem)
currentSubstream.enqueueOutputElement(elem)
pendingElement = NoPending
primaryInputs.NeedsInput
case PendingElementForNewStream(elem)
val substreamOutput = newSubstream()
pushToDownstream(substreamOutput.processor)
currentSubstream = substreamOutput
pendingElement = PendingElement(elem)
currentSubstream.NeedsDemand
}
override def invalidateSubstream(substream: ActorRef): Unit = {
pendingElement match {
case PendingElement(_)
transferState = primaryInputs.NeedsInput
pendingElement = NoPending
case _
}
super.invalidateSubstream(substream)
}
}

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.GeneratorSettings
import org.reactivestreams.api.Producer
import scala.concurrent.forkjoin.ThreadLocalRandom
/**
* INTERNAL API
*/
private[akka] class MergeImpl(_settings: GeneratorSettings, _other: Producer[Any])
extends TwoStreamInputProcessor(_settings, _other) {
def needsAnyInputAndDemand = (primaryInputs.NeedsInput || secondaryInputs.NeedsInput) && PrimaryOutputs.NeedsDemand
override def initialTransferState = needsAnyInputAndDemand
override def transfer(): TransferState = {
// TODO: More flexible merging strategies are possible here. This takes a random element if we have elements
// from both upstreams.
val tieBreak = ThreadLocalRandom.current().nextBoolean()
if (primaryInputs.inputsAvailable && (!secondaryInputs.inputsAvailable || tieBreak)) {
PrimaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement())
} else {
PrimaryOutputs.enqueueOutputElement(secondaryInputs.dequeueInputElement())
}
needsAnyInputAndDemand
}
}
/**
* INTERNAL API
*/
private[akka] class ZipImpl(_settings: GeneratorSettings, _other: Producer[Any])
extends TwoStreamInputProcessor(_settings, _other) {
def needsBothInputAndDemand = primaryInputs.NeedsInput && secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand
override def initialTransferState = needsBothInputAndDemand
override protected def transfer(): TransferState = {
PrimaryOutputs.enqueueOutputElement((primaryInputs.dequeueInputElement(), secondaryInputs.dequeueInputElement()))
needsBothInputAndDemand
}
}
/**
* INTERNAL API
*/
private[akka] class ConcatImpl(_settings: GeneratorSettings, _other: Producer[Any])
extends TwoStreamInputProcessor(_settings, _other) {
def needsPrimaryInputAndDemandWithComplete = primaryInputs.NeedsInputOrComplete && PrimaryOutputs.NeedsDemand
def needsSecondaryInputAndDemand = secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand
var processingPrimary = true
override protected def initialTransferState: TransferState = needsPrimaryInputAndDemandWithComplete
override protected def transfer(): TransferState = {
if (processingPrimary) {
if (primaryInputs.inputsDepleted) {
processingPrimary = false
needsSecondaryInputAndDemand
} else {
PrimaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement())
needsPrimaryInputAndDemandWithComplete
}
} else {
PrimaryOutputs.enqueueOutputElement(secondaryInputs.dequeueInputElement())
needsSecondaryInputAndDemand
}
}
}

View file

@ -61,6 +61,16 @@ private[akka] case class StreamImpl[I, O](producerNode: Ast.ProducerNode[I], ops
onComplete.asInstanceOf[Any immutable.Seq[Any]],
isComplete.asInstanceOf[Any Boolean])))
override def zip[O2](other: Producer[O2]): Stream[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]]))
override def concat(next: Producer[O]): Stream[O] = andThen(Concat(next.asInstanceOf[Producer[Any]]))
override def merge(other: Producer[O]): Stream[O] = andThen(Merge(other.asInstanceOf[Producer[Any]]))
override def splitWhen(p: (O) Boolean): Stream[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any Boolean]))
override def groupBy[K](f: (O) K): Stream[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any Any]))
def toFuture(generator: ProcessorGenerator): Future[O] = {
val p = Promise[O]()
transformRecover(0)((x, in) { p complete in; 1 -> Nil }, isComplete = _ == 1).consume(generator)

View file

@ -0,0 +1,177 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.GeneratorSettings
import akka.actor.{ Terminated, ActorRef }
import org.reactivestreams.spi.{ Subscriber, Subscription }
import org.reactivestreams.api.Producer
/**
* INTERNAL API
*/
private[akka] object MultiStreamOutputProcessor {
case class SubstreamRequestMore(substream: ActorRef, demand: Int)
case class SubstreamCancel(substream: ActorRef)
class SubstreamSubscription( final val parent: ActorRef, final val substream: ActorRef) extends Subscription {
override def requestMore(elements: Int): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0")
else parent ! SubstreamRequestMore(substream, elements)
override def cancel(): Unit = parent ! SubstreamCancel(substream)
override def toString = "SubstreamSubscription" + System.identityHashCode(this)
}
}
/**
* INTERNAL API
*/
private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSettings) extends ActorProcessorImpl(_settings) {
import MultiStreamOutputProcessor._
private val substreamOutputs = collection.mutable.Map.empty[ActorRef, SubstreamOutputs]
class SubstreamOutputs extends Outputs {
private var completed: Boolean = false
private var demands: Int = 0
val substream = context.watch(context.actorOf(IdentityProcessorImpl.props(settings)))
val processor = new ActorProcessor[AnyRef, AnyRef](substream)
override def isComplete: Boolean = completed
override def complete(): Unit = {
if (!completed) substream ! OnComplete
completed = true
}
override def cancel(): Unit = completed = true
override def enqueueOutputElement(elem: Any): Unit = {
demands -= 1
substream ! OnNext(elem)
}
def enqueueOutputDemand(demand: Int): Unit = demands += demand
override def demandAvailable: Boolean = demands > 0
override val NeedsDemand: TransferState = new TransferState {
override def isReady: Boolean = demandAvailable
override def isCompleted: Boolean = completed
}
override val NeedsDemandOrCancel: TransferState = new TransferState {
override def isReady: Boolean = demandAvailable || isComplete
override def isCompleted: Boolean = false
}
}
protected def newSubstream(): SubstreamOutputs = {
val outputs = new SubstreamOutputs
outputs.substream ! OnSubscribe(new SubstreamSubscription(self, outputs.substream))
substreamOutputs(outputs.substream) = outputs
outputs
}
protected def invalidateSubstream(substream: ActorRef): Unit = {
substreamOutputs(substream).complete()
substreamOutputs -= substream
if ((isShuttingDown || PrimaryOutputs.isComplete) && context.children.isEmpty) context.stop(self)
pump()
}
override def fail(e: Throwable): Unit = {
context.children foreach (_ ! OnError(e))
super.fail(e)
}
override def shutdown(completed: Boolean): Unit = {
// If the master stream is cancelled (no one consumes substreams) then this callback does not mean we are shutting down
// We can only shut down after all substreams are closed
//if (!PrimaryOutputs.isComplete) {
if (context.children.isEmpty) super.shutdown(completed)
//}
}
override def completeDownstream(): Unit = {
context.children foreach (_ ! OnComplete)
super.completeDownstream()
}
override val downstreamManagement: Receive = super.downstreamManagement orElse {
case SubstreamRequestMore(key, demand)
substreamOutputs(key).enqueueOutputDemand(demand)
pump()
case SubstreamCancel(key) // FIXME: Terminated should handle this case. Maybe remove SubstreamCancel and just Poison self?
case Terminated(child) invalidateSubstream(child)
}
}
/**
* INTERNAL API
*/
private[akka] object TwoStreamInputProcessor {
class OtherActorSubscriber[T]( final val impl: ActorRef) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! OnError(cause)
override def onComplete(): Unit = impl ! OtherStreamOnComplete
override def onNext(element: T): Unit = impl ! OtherStreamOnNext(element)
override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription)
}
case object OtherStreamOnComplete
case class OtherStreamOnNext(element: Any)
case class OtherStreamOnSubscribe(subscription: Subscription)
}
/**
* INTERNAL API
*/
private[akka] abstract class TwoStreamInputProcessor(_settings: GeneratorSettings, val other: Producer[Any])
extends ActorProcessorImpl(_settings) {
import TwoStreamInputProcessor._
var secondaryInputs: Inputs = _
other.getPublisher.subscribe(new OtherActorSubscriber(self))
override def waitingForUpstream: Receive = super.waitingForUpstream orElse {
case OtherStreamOnComplete
secondaryInputs = EmptyInputs
transitionToRunningWhenReady()
case OtherStreamOnSubscribe(subscription)
assert(subscription != null)
secondaryInputs = new BatchingInputBuffer(subscription, settings.initialInputBufferSize)
transitionToRunningWhenReady()
}
override def running: Receive = super.running orElse {
case OtherStreamOnNext(element)
secondaryInputs.enqueueInputElement(element)
pump()
case OtherStreamOnComplete
secondaryInputs.complete()
flushAndComplete()
pump()
}
override def flushAndComplete(): Unit = {
if (secondaryInputs.isCompleted && primaryInputs.isCompleted)
super.flushAndComplete()
}
override def transitionToRunningWhenReady(): Unit = if ((primaryInputs ne null) && (secondaryInputs ne null)) {
secondaryInputs.prefetch()
super.transitionToRunningWhenReady()
}
override def fail(cause: Throwable): Unit = {
if (secondaryInputs ne null) secondaryInputs.cancel()
super.fail(cause)
}
override def cancelUpstream(): Unit = {
if (secondaryInputs ne null) secondaryInputs.cancel()
super.cancelUpstream()
}
}

View file

@ -0,0 +1,138 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import org.reactivestreams.spi.Subscription
import java.util.Arrays
trait Inputs {
def NeedsInput: TransferState
def NeedsInputOrComplete: TransferState
def enqueueInputElement(elem: Any): Unit
def dequeueInputElement(): Any
def cancel(): Unit
def complete(): Unit
def isCompleted: Boolean
def prefetch(): Unit
def clear(): Unit
def inputsDepleted: Boolean
def inputsAvailable: Boolean
}
trait Outputs {
def NeedsDemand: TransferState
def NeedsDemandOrCancel: TransferState
def demandAvailable: Boolean
def enqueueOutputElement(elem: Any): Unit
def complete(): Unit
def cancel(): Unit
def isComplete: Boolean
}
// States of the operation that is executed by this processor
trait TransferState {
def isReady: Boolean
def isCompleted: Boolean
def isExecutable = isReady && !isCompleted
def ||(other: TransferState): TransferState = new TransferState {
def isReady: Boolean = TransferState.this.isReady || other.isReady
def isCompleted: Boolean = TransferState.this.isCompleted && other.isCompleted
}
def &&(other: TransferState): TransferState = new TransferState {
def isReady: Boolean = TransferState.this.isReady && other.isReady
def isCompleted: Boolean = TransferState.this.isCompleted || other.isCompleted
}
}
object Completed extends TransferState {
def isReady = false
def isCompleted = true
}
object EmptyInputs extends Inputs {
override def inputsAvailable: Boolean = false
override def inputsDepleted: Boolean = true
override def isCompleted: Boolean = true
override def complete(): Unit = ()
override def cancel(): Unit = ()
override def prefetch(): Unit = ()
override def clear(): Unit = ()
override def dequeueInputElement(): Any = throw new UnsupportedOperationException("Cannot dequeue from EmptyInputs")
override def enqueueInputElement(elem: Any): Unit = throw new UnsupportedOperationException("Cannot enqueue to EmptyInputs")
override val NeedsInputOrComplete: TransferState = new TransferState {
override def isReady: Boolean = true
override def isCompleted: Boolean = false
}
override val NeedsInput: TransferState = Completed
}
class BatchingInputBuffer(val upstream: Subscription, val size: Int) extends Inputs {
// TODO: buffer and batch sizing heuristics
private var inputBuffer = Array.ofDim[AnyRef](size)
private var inputBufferElements = 0
private var nextInputElementCursor = 0
private var upstreamCompleted = false
val IndexMask = size - 1
private def requestBatchSize = math.max(1, inputBuffer.length / 2)
private var batchRemaining = requestBatchSize
def prefetch(): Unit = upstream.requestMore(inputBuffer.length)
def dequeueInputElement(): Any = {
val elem = inputBuffer(nextInputElementCursor)
inputBuffer(nextInputElementCursor) = null
batchRemaining -= 1
if (batchRemaining == 0 && !upstreamCompleted) {
upstream.requestMore(requestBatchSize)
batchRemaining = requestBatchSize
}
inputBufferElements -= 1
nextInputElementCursor += 1
nextInputElementCursor &= IndexMask
elem
}
def enqueueInputElement(elem: Any): Unit = {
inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef]
inputBufferElements += 1
}
def complete(): Unit = upstreamCompleted = true
def cancel(): Unit = {
if (!upstreamCompleted) upstream.cancel()
upstreamCompleted = true
}
def isCompleted: Boolean = upstreamCompleted
def clear(): Unit = {
Arrays.fill(inputBuffer, 0, inputBuffer.length, null)
inputBufferElements = 0
}
def inputsDepleted = upstreamCompleted && inputBufferElements == 0
def inputsAvailable = inputBufferElements > 0
override val NeedsInput: TransferState = new TransferState {
def isReady = inputsAvailable
def isCompleted = inputsDepleted
}
override val NeedsInputOrComplete: TransferState = new TransferState {
def isReady = inputsAvailable || inputsDepleted
def isCompleted = false
}
}

View file

@ -0,0 +1,41 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
import akka.stream.Stream
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
class StreamConcatSpec extends AkkaSpec {
import system.dispatcher
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2), system)
"Concat" must {
"work in the happy case" in {
val source0 = Stream(List.empty[Int].iterator).toProducer(gen)
val source1 = Stream((1 to 4).iterator).toProducer(gen)
val source2 = Stream((5 to 10).iterator).toProducer(gen)
val p = Stream(source0).concat(source1).concat(source2).toProducer(gen)
val probe = StreamTestKit.consumerProbe[Int]
p.produceTo(probe)
val subscription = probe.expectSubscription()
for (i 1 to 10) {
subscription.requestMore(1)
probe.expectNext(i)
}
probe.expectComplete()
}
}
}

View file

@ -0,0 +1,165 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit._
import akka.testkit.AkkaSpec
import org.reactivestreams.api.Producer
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
import akka.stream.Stream
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StreamGroupBySpec extends AkkaSpec {
import system.dispatcher
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2), system)
case class StreamPuppet(p: Producer[Int]) {
val probe = StreamTestKit.consumerProbe[Int]
p.produceTo(probe)
val subscription = probe.expectSubscription()
def requestMore(demand: Int): Unit = subscription.requestMore(demand)
def expectNext(elem: Int): Unit = probe.expectNext(elem)
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def expectComplete(): Unit = probe.expectComplete()
def cancel(): Unit = subscription.cancel()
}
class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) {
val source = Stream((1 to elementCount).iterator).toProducer(gen)
val groupStream = Stream(source).groupBy(_ % groupCount).toProducer(gen)
val masterConsumer = StreamTestKit.consumerProbe[(Int, Producer[Int])]
groupStream.produceTo(masterConsumer)
val masterSubscription = masterConsumer.expectSubscription()
def getSubproducer(expectedKey: Int): Producer[Int] = {
masterSubscription.requestMore(1)
expectSubproducer(expectedKey: Int)
}
def expectSubproducer(expectedKey: Int): Producer[Int] = {
val (key, substream) = masterConsumer.expectNext()
key should be(expectedKey)
substream
}
}
"groupBy" must {
"work in the happy case" in new SubstreamsSupport(groupCount = 2) {
val s1 = StreamPuppet(getSubproducer(1))
masterConsumer.expectNoMsg(100.millis)
s1.expectNoMsg(100.millis)
s1.requestMore(1)
s1.expectNext(1)
s1.expectNoMsg(100.millis)
val s2 = StreamPuppet(getSubproducer(0))
masterConsumer.expectNoMsg(100.millis)
s2.expectNoMsg(100.millis)
s2.requestMore(2)
s2.expectNext(2)
s2.expectNext(4)
s2.expectNoMsg(100.millis)
s1.requestMore(1)
s1.expectNext(3)
s2.requestMore(1)
s2.expectNext(6)
s2.expectComplete()
s1.requestMore(1)
s1.expectNext(5)
s1.expectComplete()
masterConsumer.expectComplete()
}
"accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) {
StreamPuppet(getSubproducer(1)).cancel()
val substream = StreamPuppet(getSubproducer(0))
masterConsumer.expectNoMsg(100.millis)
substream.requestMore(2)
substream.expectNext(2)
substream.expectNext(4)
substream.expectNoMsg(100.millis)
substream.requestMore(2)
substream.expectNext(6)
substream.expectComplete()
masterConsumer.expectComplete()
}
"accept cancellation of master stream when not consumed anything" in new SubstreamsSupport(groupCount = 2) {
masterSubscription.cancel()
masterConsumer.expectNoMsg(100.millis)
}
"accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) {
pending
// val substream = StreamPuppet(getSubproducer(1))
//
// substream.requestMore(1)
// substream.expectNext(1)
//
// masterSubscription.cancel()
// masterConsumer.expectNoMsg(100.millis)
//
// // Open substreams still work, others are discarded
// substream.requestMore(4)
// substream.expectNext(4)
// substream.expectNext(7)
// substream.expectNext(10)
// substream.expectNext(13)
// substream.expectComplete()
}
"work with fanout on substreams" in new SubstreamsSupport(groupCount = 2) {
val substreamProducer = getSubproducer(1)
getSubproducer(0)
val substreamConsumer1 = StreamPuppet(substreamProducer)
val substreamConsumer2 = StreamPuppet(substreamProducer)
substreamConsumer1.requestMore(1)
substreamConsumer1.expectNext(1)
substreamConsumer2.requestMore(1)
substreamConsumer2.expectNext(1)
substreamConsumer1.requestMore(1)
substreamConsumer1.expectNext(3)
substreamConsumer2.requestMore(1)
substreamConsumer2.expectNext(3)
}
"work with fanout on master stream" in {
pending
}
"work with fanout on substreams and master stream" in {
pending
}
"abort on onError from upstream" in {
pending
}
}
}

View file

@ -0,0 +1,48 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import org.reactivestreams.api.Producer
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
import akka.stream.{ Stream, GeneratorSettings }
class StreamMergeSpec extends AkkaSpec {
import system.dispatcher
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2), system)
"merge" must {
"work in the happy case" in {
// Different input sizes (4 and 6)
val source1 = Stream((1 to 4).iterator).toProducer(gen)
val source2 = Stream((5 to 10).iterator).toProducer(gen)
val source3 = Stream(List.empty[Int].iterator).toProducer(gen)
val p = Stream(source1).merge(source2).merge(source3).toProducer(gen)
val probe = StreamTestKit.consumerProbe[Int]
p.produceTo(probe)
val subscription = probe.expectSubscription()
var collected = Set.empty[Int]
for (_ 1 to 10) {
subscription.requestMore(1)
collected += probe.expectNext()
}
collected should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
probe.expectComplete()
}
}
}

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import org.reactivestreams.api.Producer
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
import akka.stream.{ Stream, GeneratorSettings }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StreamSplitWhenSpec extends AkkaSpec {
import system.dispatcher
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2), system)
case class StreamPuppet(p: Producer[Int]) {
val probe = StreamTestKit.consumerProbe[Int]
p.produceTo(probe)
val subscription = probe.expectSubscription()
def requestMore(demand: Int): Unit = subscription.requestMore(demand)
def expectNext(elem: Int): Unit = probe.expectNext(elem)
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def expectComplete(): Unit = probe.expectComplete()
def cancel(): Unit = subscription.cancel()
}
class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) {
val source = Stream((1 to elementCount).iterator).toProducer(gen)
val groupStream = Stream(source).splitWhen(_ == splitWhen).toProducer(gen)
val masterConsumer = StreamTestKit.consumerProbe[Producer[Int]]
groupStream.produceTo(masterConsumer)
val masterSubscription = masterConsumer.expectSubscription()
def getSubproducer(): Producer[Int] = {
masterSubscription.requestMore(1)
expectSubproducer()
}
def expectSubproducer(): Producer[Int] = {
val substream = masterConsumer.expectNext()
substream
}
}
"splitWhen" must {
"work in the happy case" in new SubstreamsSupport(elementCount = 4) {
val s1 = StreamPuppet(getSubproducer())
masterConsumer.expectNoMsg(100.millis)
s1.requestMore(2)
s1.expectNext(1)
s1.expectNext(2)
s1.expectComplete()
val s2 = StreamPuppet(getSubproducer())
masterConsumer.expectComplete()
s2.requestMore(1)
s2.expectNext(3)
s2.expectNoMsg(100.millis)
s2.requestMore(1)
s2.expectNext(4)
s2.expectComplete()
}
"support cancelling substreams" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubproducer())
s1.cancel()
val s2 = StreamPuppet(getSubproducer())
s2.requestMore(4)
s2.expectNext(5)
s2.expectNext(6)
s2.expectNext(7)
s2.expectNext(8)
s2.expectComplete()
masterConsumer.expectComplete()
}
"support cancelling the master stream" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) {
val s1 = StreamPuppet(getSubproducer())
masterSubscription.cancel()
s1.requestMore(4)
s1.expectNext(1)
s1.expectNext(2)
s1.expectNext(3)
s1.expectNext(4)
s1.expectComplete()
}
}
}

View file

@ -179,7 +179,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
{ case (s, Failure(ex)) (s + ex.getMessage, List(ex)) },
onComplete = x List(TE(x.size + "10")))
.toProducer(gen)
val proc = p.expectSubscription
val proc = p.expectSubscription()
val c = StreamTestKit.consumerProbe[Throwable]
p2.produceTo(c)
val s = c.expectSubscription()
@ -193,7 +193,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
"forward errors when received and thrown" in {
val p = StreamTestKit.producerProbe[Int]
val p2 = Stream(p).transformRecover("")((_, in) "" -> List(in.get)).toProducer(gen)
val proc = p.expectSubscription
val proc = p.expectSubscription()
val c = StreamTestKit.consumerProbe[Int]
p2.produceTo(c)
val s = c.expectSubscription()

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
import akka.stream.Stream
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
class StreamZipSpec extends AkkaSpec {
import system.dispatcher
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2), system)
"Zip" must {
"work in the happy case" in {
// Different input sizes (4 and 6)
val source1 = Stream((1 to 4).iterator).toProducer(gen)
val source2 = Stream(List("A", "B", "C", "D", "E", "F").iterator).toProducer(gen)
val p = Stream(source1).zip(source2).toProducer(gen)
val probe = StreamTestKit.consumerProbe[(Int, String)]
p.produceTo(probe)
val subscription = probe.expectSubscription()
subscription.requestMore(2)
probe.expectNext((1, "A"))
probe.expectNext((2, "B"))
subscription.requestMore(1)
probe.expectNext((3, "C"))
subscription.requestMore(1)
probe.expectNext((4, "D"))
probe.expectComplete()
}
}
}