!str Fixed comments round 1

This commit is contained in:
Endre Sándor Varga 2014-04-01 12:45:41 +02:00 committed by Roland Kuhn
parent 429f68e9d9
commit 441144e44f
8 changed files with 125 additions and 100 deletions

View file

@ -24,7 +24,7 @@ private[akka] object ActorProcessor {
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case m: Merge Props(new MergeImpl(settings, m.other))
case z: Zip Props(new ZipImpl(settings, z.other))
case c: Concat Props(new ConcatImpl(settings, c.other))
case c: Concat Props(new ConcatImpl(settings, c.next))
}
}
@ -67,11 +67,12 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
case OnError(cause) failureReceived(cause)
}
def transitionToRunningWhenReady(): Unit = if (primaryInputs ne null) {
primaryInputs.prefetch()
transferState = initialTransferState
context.become(running)
}
def transitionToRunningWhenReady(): Unit =
if (primaryInputs ne null) {
primaryInputs.prefetch()
transferState = initialTransferState
context.become(running)
}
////////////////////// Management of subscribers //////////////////////
@ -112,6 +113,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
// Called by SubscriberManagement whenever the output buffer is ready to accept additional elements
override protected def requestFromUpstream(elements: Int): Unit = {
// FIXME: Remove debug logging
log.debug(s"received downstream demand from buffer: $elements")
PrimaryOutputs.enqueueOutputDemand(elements)
}
@ -139,7 +141,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
def complete(): Unit = downstreamCompleted = true
def cancel(): Unit = downstreamCompleted = true
def isComplete: Boolean = downstreamCompleted
def isClosed: Boolean = downstreamCompleted
override val NeedsDemand: TransferState = new TransferState {
def isReady = demandAvailable
def isCompleted = downstreamCompleted
@ -150,7 +152,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
}
}
def needsPrimaryInputAndDemand = primaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand
lazy val needsPrimaryInputAndDemand = primaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand
var transferState: TransferState = _
protected def initialTransferState: TransferState
@ -159,16 +161,19 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
// Generate upstream requestMore for every Nth consumed input element
protected def pump(): Unit = {
try while (transferState.isExecutable) {
transferState = transfer()
// FIXME: Remove debug logging
log.debug(s"iterating the pump with state $transferState and buffer $bufferDebug")
transferState = transfer()
} catch { case NonFatal(e) fail(e) }
// FIXME: Remove debug logging
log.debug(s"finished iterating the pump with state $transferState and buffer $bufferDebug")
if (transferState.isCompleted) {
if (!isShuttingDown) {
// FIXME: Remove debug logging
log.debug("shutting down the pump")
if (!primaryInputs.isCompleted) primaryInputs.cancel()
if (primaryInputs.isOpen) primaryInputs.cancel()
primaryInputs.clear()
context.become(flushing)
isShuttingDown = true
@ -234,7 +239,7 @@ private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast
var emits = immutable.Seq.empty[Any]
object NeedsInputAndDemandOrCompletion extends TransferState {
def isReady = primaryInputs.inputsAvailable && PrimaryOutputs.demandAvailable || primaryInputs.inputsDepleted
def isReady = (primaryInputs.inputsAvailable && PrimaryOutputs.demandAvailable) || primaryInputs.inputsDepleted
def isCompleted = false
}
@ -271,13 +276,13 @@ private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast
*/
private[akka] class RecoverProcessorImpl(_settings: GeneratorSettings, _op: Ast.Recover) extends TransformProcessorImpl(_settings, _op.t) {
val WrapInSuccess: Receive = {
val wrapInSuccess: Receive = {
case OnNext(elem)
primaryInputs.enqueueInputElement(Success(elem))
pump()
}
override def running: Receive = WrapInSuccess orElse super.running
override def running: Receive = wrapInSuccess orElse super.running
override def failureReceived(e: Throwable): Unit = {
primaryInputs.enqueueInputElement(Failure(e))

View file

@ -13,7 +13,7 @@ import akka.stream.impl._
*/
private[akka] object GroupByProcessorImpl {
trait SubstreamElementState
sealed trait SubstreamElementState
case object NoPending extends SubstreamElementState
case class PendingElement(elem: Any, key: Any) extends SubstreamElementState
case class PendingElementForNewStream(elem: Any, key: Any) extends SubstreamElementState
@ -32,36 +32,39 @@ private[akka] class GroupByProcessorImpl(settings: GeneratorSettings, val keyFor
override def initialTransferState = needsPrimaryInputAndDemand
override def transfer(): TransferState = substreamPendingState match {
case PendingElementForNewStream(elem, key)
if (PrimaryOutputs.isComplete) {
override def transfer(): TransferState = {
substreamPendingState match {
case PendingElementForNewStream(elem, key)
if (PrimaryOutputs.isClosed) {
substreamPendingState = NoPending
// Just drop, we do not open any more substreams
} else {
val substreamOutput = newSubstream()
pushToDownstream((key, substreamOutput.processor))
keyToSubstreamOutputs(key) = substreamOutput
substreamPendingState = PendingElement(elem, key)
}
case PendingElement(elem, key)
if (keyToSubstreamOutputs(key).isOpen) keyToSubstreamOutputs(key).enqueueOutputElement(elem)
substreamPendingState = NoPending
// Just drop, we do not open any more substreams
primaryInputs.NeedsInput
} else {
val substreamOutput = newSubstream()
pushToDownstream((key, substreamOutput.processor))
keyToSubstreamOutputs(key) = substreamOutput
substreamPendingState = PendingElement(elem, key)
substreamOutput.NeedsDemand
}
case PendingElement(elem, key)
if (!keyToSubstreamOutputs(key).isComplete) keyToSubstreamOutputs(key).enqueueOutputElement(elem)
substreamPendingState = NoPending
primaryInputs.NeedsInput
case NoPending
val elem = primaryInputs.dequeueInputElement()
val key = keyFor(elem)
if (keyToSubstreamOutputs.contains(key)) {
substreamPendingState = PendingElement(elem, key)
} else if (PrimaryOutputs.isOpen) {
substreamPendingState = PendingElementForNewStream(elem, key)
}
case NoPending
val elem = primaryInputs.dequeueInputElement()
val key = keyFor(elem)
if (keyToSubstreamOutputs.contains(key)) {
substreamPendingState = PendingElement(elem, key)
keyToSubstreamOutputs(key).NeedsDemand
} else if (!PrimaryOutputs.isComplete) {
substreamPendingState = PendingElementForNewStream(elem, key)
PrimaryOutputs.NeedsDemand
} else primaryInputs.NeedsInput
}
substreamPendingState match {
case NoPending primaryInputs.NeedsInput
case PendingElement(_, key) keyToSubstreamOutputs(key).NeedsDemand
case PendingElementForNewStream(_, _) PrimaryOutputs.NeedsDemand
}
}
override def invalidateSubstream(substream: ActorRef): Unit = {
@ -69,9 +72,6 @@ private[akka] class GroupByProcessorImpl(settings: GeneratorSettings, val keyFor
case PendingElement(_, key) if keyToSubstreamOutputs(key).substream == substream
transferState = primaryInputs.NeedsInput
substreamPendingState = NoPending
case PendingElementForNewStream(_, key) if keyToSubstreamOutputs(key).substream == substream
transferState = primaryInputs.NeedsInput
substreamPendingState = NoPending
case _
}
super.invalidateSubstream(substream)

View file

@ -24,7 +24,7 @@ private[akka] object Ast {
case class SplitWhen(p: Any Boolean) extends AstNode
case class Merge(other: Producer[Any]) extends AstNode
case class Zip(other: Producer[Any]) extends AstNode
case class Concat(other: Producer[Any]) extends AstNode
case class Concat(next: Producer[Any]) extends AstNode
trait ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I]

View file

@ -14,7 +14,7 @@ import akka.actor.Terminated
*/
private[akka] object SplitWhenProcessorImpl {
trait SubstreamElementState
sealed trait SubstreamElementState
case object NoPending extends SubstreamElementState
case class PendingElement(elem: Any) extends SubstreamElementState
case class PendingElementForNewStream(elem: Any) extends SubstreamElementState
@ -33,31 +33,34 @@ private[akka] class SplitWhenProcessorImpl(_settings: GeneratorSettings, val spl
override def initialTransferState = needsPrimaryInputAndDemand
override def transfer(): TransferState = pendingElement match {
case NoPending
val elem = primaryInputs.dequeueInputElement()
if (!started) {
pendingElement = PendingElementForNewStream(elem)
started = true
PrimaryOutputs.NeedsDemand
} else if (splitPredicate(elem)) {
pendingElement = PendingElementForNewStream(elem)
currentSubstream.complete()
PrimaryOutputs.NeedsDemand
} else if (!currentSubstream.isComplete) {
override def transfer(): TransferState = {
pendingElement match {
case NoPending
val elem = primaryInputs.dequeueInputElement()
if (!started) {
pendingElement = PendingElementForNewStream(elem)
started = true
} else if (splitPredicate(elem)) {
pendingElement = PendingElementForNewStream(elem)
currentSubstream.complete()
} else if (currentSubstream.isOpen) {
pendingElement = PendingElement(elem)
} else primaryInputs.NeedsInput
case PendingElement(elem)
currentSubstream.enqueueOutputElement(elem)
pendingElement = NoPending
case PendingElementForNewStream(elem)
val substreamOutput = newSubstream()
pushToDownstream(substreamOutput.processor)
currentSubstream = substreamOutput
pendingElement = PendingElement(elem)
currentSubstream.NeedsDemand
} else primaryInputs.NeedsInput
case PendingElement(elem)
currentSubstream.enqueueOutputElement(elem)
pendingElement = NoPending
primaryInputs.NeedsInput
case PendingElementForNewStream(elem)
val substreamOutput = newSubstream()
pushToDownstream(substreamOutput.processor)
currentSubstream = substreamOutput
pendingElement = PendingElement(elem)
currentSubstream.NeedsDemand
}
pendingElement match {
case NoPending primaryInputs.NeedsInput
case PendingElement(_) currentSubstream.NeedsDemand
case PendingElementForNewStream(_) PrimaryOutputs.NeedsDemand
}
}
override def invalidateSubstream(substream: ActorRef): Unit = {

View file

@ -13,7 +13,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom
private[akka] class MergeImpl(_settings: GeneratorSettings, _other: Producer[Any])
extends TwoStreamInputProcessor(_settings, _other) {
def needsAnyInputAndDemand = (primaryInputs.NeedsInput || secondaryInputs.NeedsInput) && PrimaryOutputs.NeedsDemand
lazy val needsAnyInputAndDemand = (primaryInputs.NeedsInput || secondaryInputs.NeedsInput) && PrimaryOutputs.NeedsDemand
override def initialTransferState = needsAnyInputAndDemand
override def transfer(): TransferState = {
@ -36,7 +36,7 @@ private[akka] class MergeImpl(_settings: GeneratorSettings, _other: Producer[Any
private[akka] class ZipImpl(_settings: GeneratorSettings, _other: Producer[Any])
extends TwoStreamInputProcessor(_settings, _other) {
def needsBothInputAndDemand = primaryInputs.NeedsInput && secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand
lazy val needsBothInputAndDemand = primaryInputs.NeedsInput && secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand
override def initialTransferState = needsBothInputAndDemand
override protected def transfer(): TransferState = {
@ -51,8 +51,8 @@ private[akka] class ZipImpl(_settings: GeneratorSettings, _other: Producer[Any])
private[akka] class ConcatImpl(_settings: GeneratorSettings, _other: Producer[Any])
extends TwoStreamInputProcessor(_settings, _other) {
def needsPrimaryInputAndDemandWithComplete = primaryInputs.NeedsInputOrComplete && PrimaryOutputs.NeedsDemand
def needsSecondaryInputAndDemand = secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand
lazy val needsPrimaryInputAndDemandWithComplete = primaryInputs.NeedsInputOrComplete && PrimaryOutputs.NeedsDemand
lazy val needsSecondaryInputAndDemand = secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand
var processingPrimary = true
override protected def initialTransferState: TransferState = needsPrimaryInputAndDemandWithComplete

View file

@ -15,7 +15,7 @@ private[akka] object MultiStreamOutputProcessor {
case class SubstreamRequestMore(substream: ActorRef, demand: Int)
case class SubstreamCancel(substream: ActorRef)
class SubstreamSubscription( final val parent: ActorRef, final val substream: ActorRef) extends Subscription {
class SubstreamSubscription(val parent: ActorRef, val substream: ActorRef) extends Subscription {
override def requestMore(elements: Int): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0")
else parent ! SubstreamRequestMore(substream, elements)
@ -40,7 +40,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett
val substream = context.watch(context.actorOf(IdentityProcessorImpl.props(settings)))
val processor = new ActorProcessor[AnyRef, AnyRef](substream)
override def isComplete: Boolean = completed
override def isClosed: Boolean = completed
override def complete(): Unit = {
if (!completed) substream ! OnComplete
completed = true
@ -52,6 +52,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett
demands -= 1
substream ! OnNext(elem)
}
def enqueueOutputDemand(demand: Int): Unit = demands += demand
override def demandAvailable: Boolean = demands > 0
override val NeedsDemand: TransferState = new TransferState {
@ -59,7 +60,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett
override def isCompleted: Boolean = completed
}
override val NeedsDemandOrCancel: TransferState = new TransferState {
override def isReady: Boolean = demandAvailable || isComplete
override def isReady: Boolean = demandAvailable || isClosed
override def isCompleted: Boolean = false
}
}
@ -74,7 +75,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett
protected def invalidateSubstream(substream: ActorRef): Unit = {
substreamOutputs(substream).complete()
substreamOutputs -= substream
if ((isShuttingDown || PrimaryOutputs.isComplete) && context.children.isEmpty) context.stop(self)
if ((isShuttingDown || PrimaryOutputs.isClosed) && context.children.isEmpty) context.stop(self)
pump()
}
@ -84,12 +85,10 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett
}
override def shutdown(completed: Boolean): Unit = {
// If the master stream is cancelled (no one consumes substreams) then this callback does not mean we are shutting down
// We can only shut down after all substreams are closed
//if (!PrimaryOutputs.isComplete) {
// If the master stream is cancelled (no one consumes substreams as elements from the master stream)
// then this callback does not mean we are shutting down
// We can only shut down after all substreams (our children) are closed
if (context.children.isEmpty) super.shutdown(completed)
//}
}
override def completeDownstream(): Unit = {
@ -111,7 +110,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett
* INTERNAL API
*/
private[akka] object TwoStreamInputProcessor {
class OtherActorSubscriber[T]( final val impl: ActorRef) extends Subscriber[T] {
class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! OnError(cause)
override def onComplete(): Unit = impl ! OtherStreamOnComplete
override def onNext(element: T): Unit = impl ! OtherStreamOnNext(element)
@ -155,7 +154,7 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: GeneratorSetting
}
override def flushAndComplete(): Unit = {
if (secondaryInputs.isCompleted && primaryInputs.isCompleted)
if (secondaryInputs.isClosed && primaryInputs.isClosed)
super.flushAndComplete()
}

View file

@ -15,7 +15,8 @@ trait Inputs {
def cancel(): Unit
def complete(): Unit
def isCompleted: Boolean
def isClosed: Boolean
def isOpen: Boolean = !isClosed
def prefetch(): Unit
def clear(): Unit
@ -33,7 +34,8 @@ trait Outputs {
def complete(): Unit
def cancel(): Unit
def isComplete: Boolean
def isClosed: Boolean
def isOpen: Boolean = !isClosed
}
// States of the operation that is executed by this processor
@ -61,7 +63,7 @@ object Completed extends TransferState {
object EmptyInputs extends Inputs {
override def inputsAvailable: Boolean = false
override def inputsDepleted: Boolean = true
override def isCompleted: Boolean = true
override def isClosed: Boolean = true
override def complete(): Unit = ()
override def cancel(): Unit = ()
@ -84,14 +86,14 @@ class BatchingInputBuffer(val upstream: Subscription, val size: Int) extends Inp
private var inputBufferElements = 0
private var nextInputElementCursor = 0
private var upstreamCompleted = false
val IndexMask = size - 1
private val IndexMask = size - 1
private def requestBatchSize = math.max(1, inputBuffer.length / 2)
private var batchRemaining = requestBatchSize
def prefetch(): Unit = upstream.requestMore(inputBuffer.length)
override def prefetch(): Unit = upstream.requestMore(inputBuffer.length)
def dequeueInputElement(): Any = {
override def dequeueInputElement(): Any = {
val elem = inputBuffer(nextInputElementCursor)
inputBuffer(nextInputElementCursor) = null
@ -107,25 +109,26 @@ class BatchingInputBuffer(val upstream: Subscription, val size: Int) extends Inp
elem
}
def enqueueInputElement(elem: Any): Unit = {
inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef]
inputBufferElements += 1
}
override def enqueueInputElement(elem: Any): Unit =
if (isOpen) {
inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef]
inputBufferElements += 1
}
def complete(): Unit = upstreamCompleted = true
def cancel(): Unit = {
override def complete(): Unit = upstreamCompleted = true
override def cancel(): Unit = {
if (!upstreamCompleted) upstream.cancel()
upstreamCompleted = true
}
def isCompleted: Boolean = upstreamCompleted
override def isClosed: Boolean = upstreamCompleted
def clear(): Unit = {
override def clear(): Unit = {
Arrays.fill(inputBuffer, 0, inputBuffer.length, null)
inputBufferElements = 0
}
def inputsDepleted = upstreamCompleted && inputBufferElements == 0
def inputsAvailable = inputBufferElements > 0
override def inputsDepleted = upstreamCompleted && inputBufferElements == 0
override def inputsAvailable = inputBufferElements > 0
override val NeedsInput: TransferState = new TransferState {
def isReady = inputsAvailable

View file

@ -182,6 +182,21 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
subscription.requestMore(2)
consumer.expectNoMsg(200.millis)
}
"support producing elements from empty inputs" in {
val p = Stream(List.empty[Int].iterator).toProducer(gen)
val p2 = Stream(p).transform(List(1, 2, 3))((s, _) (s, Nil), onComplete = s s).
toProducer(gen)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
subscription.requestMore(4)
consumer.expectNext(1)
consumer.expectNext(2)
consumer.expectNext(3)
consumer.expectComplete()
}
}
}