diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 2d6bef20b3..1ae507b830 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -24,7 +24,7 @@ private[akka] object ActorProcessor { case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) case z: Zip ⇒ Props(new ZipImpl(settings, z.other)) - case c: Concat ⇒ Props(new ConcatImpl(settings, c.other)) + case c: Concat ⇒ Props(new ConcatImpl(settings, c.next)) } } @@ -67,11 +67,12 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) case OnError(cause) ⇒ failureReceived(cause) } - def transitionToRunningWhenReady(): Unit = if (primaryInputs ne null) { - primaryInputs.prefetch() - transferState = initialTransferState - context.become(running) - } + def transitionToRunningWhenReady(): Unit = + if (primaryInputs ne null) { + primaryInputs.prefetch() + transferState = initialTransferState + context.become(running) + } ////////////////////// Management of subscribers ////////////////////// @@ -112,6 +113,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) // Called by SubscriberManagement whenever the output buffer is ready to accept additional elements override protected def requestFromUpstream(elements: Int): Unit = { + // FIXME: Remove debug logging log.debug(s"received downstream demand from buffer: $elements") PrimaryOutputs.enqueueOutputDemand(elements) } @@ -139,7 +141,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) def complete(): Unit = downstreamCompleted = true def cancel(): Unit = downstreamCompleted = true - def isComplete: Boolean = downstreamCompleted + def isClosed: Boolean = downstreamCompleted override val NeedsDemand: TransferState = new TransferState { def isReady = demandAvailable def isCompleted = downstreamCompleted @@ -150,7 +152,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) } } - def needsPrimaryInputAndDemand = primaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand + lazy val needsPrimaryInputAndDemand = primaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand var transferState: TransferState = _ protected def initialTransferState: TransferState @@ -159,16 +161,19 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) // Generate upstream requestMore for every Nth consumed input element protected def pump(): Unit = { try while (transferState.isExecutable) { - transferState = transfer() + // FIXME: Remove debug logging log.debug(s"iterating the pump with state $transferState and buffer $bufferDebug") + transferState = transfer() } catch { case NonFatal(e) ⇒ fail(e) } + // FIXME: Remove debug logging log.debug(s"finished iterating the pump with state $transferState and buffer $bufferDebug") if (transferState.isCompleted) { if (!isShuttingDown) { + // FIXME: Remove debug logging log.debug("shutting down the pump") - if (!primaryInputs.isCompleted) primaryInputs.cancel() + if (primaryInputs.isOpen) primaryInputs.cancel() primaryInputs.clear() context.become(flushing) isShuttingDown = true @@ -234,7 +239,7 @@ private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast var emits = immutable.Seq.empty[Any] object NeedsInputAndDemandOrCompletion extends TransferState { - def isReady = primaryInputs.inputsAvailable && PrimaryOutputs.demandAvailable || primaryInputs.inputsDepleted + def isReady = (primaryInputs.inputsAvailable && PrimaryOutputs.demandAvailable) || primaryInputs.inputsDepleted def isCompleted = false } @@ -271,13 +276,13 @@ private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast */ private[akka] class RecoverProcessorImpl(_settings: GeneratorSettings, _op: Ast.Recover) extends TransformProcessorImpl(_settings, _op.t) { - val WrapInSuccess: Receive = { + val wrapInSuccess: Receive = { case OnNext(elem) ⇒ primaryInputs.enqueueInputElement(Success(elem)) pump() } - override def running: Receive = WrapInSuccess orElse super.running + override def running: Receive = wrapInSuccess orElse super.running override def failureReceived(e: Throwable): Unit = { primaryInputs.enqueueInputElement(Failure(e)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index a4aac4191d..4232904254 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -13,7 +13,7 @@ import akka.stream.impl._ */ private[akka] object GroupByProcessorImpl { - trait SubstreamElementState + sealed trait SubstreamElementState case object NoPending extends SubstreamElementState case class PendingElement(elem: Any, key: Any) extends SubstreamElementState case class PendingElementForNewStream(elem: Any, key: Any) extends SubstreamElementState @@ -32,36 +32,39 @@ private[akka] class GroupByProcessorImpl(settings: GeneratorSettings, val keyFor override def initialTransferState = needsPrimaryInputAndDemand - override def transfer(): TransferState = substreamPendingState match { - case PendingElementForNewStream(elem, key) ⇒ - if (PrimaryOutputs.isComplete) { + override def transfer(): TransferState = { + substreamPendingState match { + case PendingElementForNewStream(elem, key) ⇒ + if (PrimaryOutputs.isClosed) { + substreamPendingState = NoPending + // Just drop, we do not open any more substreams + } else { + val substreamOutput = newSubstream() + pushToDownstream((key, substreamOutput.processor)) + keyToSubstreamOutputs(key) = substreamOutput + substreamPendingState = PendingElement(elem, key) + } + + case PendingElement(elem, key) ⇒ + if (keyToSubstreamOutputs(key).isOpen) keyToSubstreamOutputs(key).enqueueOutputElement(elem) substreamPendingState = NoPending - // Just drop, we do not open any more substreams - primaryInputs.NeedsInput - } else { - val substreamOutput = newSubstream() - pushToDownstream((key, substreamOutput.processor)) - keyToSubstreamOutputs(key) = substreamOutput - substreamPendingState = PendingElement(elem, key) - substreamOutput.NeedsDemand - } - case PendingElement(elem, key) ⇒ - if (!keyToSubstreamOutputs(key).isComplete) keyToSubstreamOutputs(key).enqueueOutputElement(elem) - substreamPendingState = NoPending - primaryInputs.NeedsInput + case NoPending ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + if (keyToSubstreamOutputs.contains(key)) { + substreamPendingState = PendingElement(elem, key) + } else if (PrimaryOutputs.isOpen) { + substreamPendingState = PendingElementForNewStream(elem, key) + } - case NoPending ⇒ - val elem = primaryInputs.dequeueInputElement() - val key = keyFor(elem) - if (keyToSubstreamOutputs.contains(key)) { - substreamPendingState = PendingElement(elem, key) - keyToSubstreamOutputs(key).NeedsDemand - } else if (!PrimaryOutputs.isComplete) { - substreamPendingState = PendingElementForNewStream(elem, key) - PrimaryOutputs.NeedsDemand - } else primaryInputs.NeedsInput + } + substreamPendingState match { + case NoPending ⇒ primaryInputs.NeedsInput + case PendingElement(_, key) ⇒ keyToSubstreamOutputs(key).NeedsDemand + case PendingElementForNewStream(_, _) ⇒ PrimaryOutputs.NeedsDemand + } } override def invalidateSubstream(substream: ActorRef): Unit = { @@ -69,9 +72,6 @@ private[akka] class GroupByProcessorImpl(settings: GeneratorSettings, val keyFor case PendingElement(_, key) if keyToSubstreamOutputs(key).substream == substream ⇒ transferState = primaryInputs.NeedsInput substreamPendingState = NoPending - case PendingElementForNewStream(_, key) if keyToSubstreamOutputs(key).substream == substream ⇒ - transferState = primaryInputs.NeedsInput - substreamPendingState = NoPending case _ ⇒ } super.invalidateSubstream(substream) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala b/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala index 52e737ec6e..f33b13dfcc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala @@ -24,7 +24,7 @@ private[akka] object Ast { case class SplitWhen(p: Any ⇒ Boolean) extends AstNode case class Merge(other: Producer[Any]) extends AstNode case class Zip(other: Producer[Any]) extends AstNode - case class Concat(other: Producer[Any]) extends AstNode + case class Concat(next: Producer[Any]) extends AstNode trait ProducerNode[I] { def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index 15a0aca31e..fcaa4bffcb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -14,7 +14,7 @@ import akka.actor.Terminated */ private[akka] object SplitWhenProcessorImpl { - trait SubstreamElementState + sealed trait SubstreamElementState case object NoPending extends SubstreamElementState case class PendingElement(elem: Any) extends SubstreamElementState case class PendingElementForNewStream(elem: Any) extends SubstreamElementState @@ -33,31 +33,34 @@ private[akka] class SplitWhenProcessorImpl(_settings: GeneratorSettings, val spl override def initialTransferState = needsPrimaryInputAndDemand - override def transfer(): TransferState = pendingElement match { - case NoPending ⇒ - val elem = primaryInputs.dequeueInputElement() - if (!started) { - pendingElement = PendingElementForNewStream(elem) - started = true - PrimaryOutputs.NeedsDemand - } else if (splitPredicate(elem)) { - pendingElement = PendingElementForNewStream(elem) - currentSubstream.complete() - PrimaryOutputs.NeedsDemand - } else if (!currentSubstream.isComplete) { + override def transfer(): TransferState = { + pendingElement match { + case NoPending ⇒ + val elem = primaryInputs.dequeueInputElement() + if (!started) { + pendingElement = PendingElementForNewStream(elem) + started = true + } else if (splitPredicate(elem)) { + pendingElement = PendingElementForNewStream(elem) + currentSubstream.complete() + } else if (currentSubstream.isOpen) { + pendingElement = PendingElement(elem) + } else primaryInputs.NeedsInput + case PendingElement(elem) ⇒ + currentSubstream.enqueueOutputElement(elem) + pendingElement = NoPending + case PendingElementForNewStream(elem) ⇒ + val substreamOutput = newSubstream() + pushToDownstream(substreamOutput.processor) + currentSubstream = substreamOutput pendingElement = PendingElement(elem) - currentSubstream.NeedsDemand - } else primaryInputs.NeedsInput - case PendingElement(elem) ⇒ - currentSubstream.enqueueOutputElement(elem) - pendingElement = NoPending - primaryInputs.NeedsInput - case PendingElementForNewStream(elem) ⇒ - val substreamOutput = newSubstream() - pushToDownstream(substreamOutput.processor) - currentSubstream = substreamOutput - pendingElement = PendingElement(elem) - currentSubstream.NeedsDemand + } + + pendingElement match { + case NoPending ⇒ primaryInputs.NeedsInput + case PendingElement(_) ⇒ currentSubstream.NeedsDemand + case PendingElementForNewStream(_) ⇒ PrimaryOutputs.NeedsDemand + } } override def invalidateSubstream(substream: ActorRef): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala b/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala index 1917f2fcbb..0e8632add5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala @@ -13,7 +13,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom private[akka] class MergeImpl(_settings: GeneratorSettings, _other: Producer[Any]) extends TwoStreamInputProcessor(_settings, _other) { - def needsAnyInputAndDemand = (primaryInputs.NeedsInput || secondaryInputs.NeedsInput) && PrimaryOutputs.NeedsDemand + lazy val needsAnyInputAndDemand = (primaryInputs.NeedsInput || secondaryInputs.NeedsInput) && PrimaryOutputs.NeedsDemand override def initialTransferState = needsAnyInputAndDemand override def transfer(): TransferState = { @@ -36,7 +36,7 @@ private[akka] class MergeImpl(_settings: GeneratorSettings, _other: Producer[Any private[akka] class ZipImpl(_settings: GeneratorSettings, _other: Producer[Any]) extends TwoStreamInputProcessor(_settings, _other) { - def needsBothInputAndDemand = primaryInputs.NeedsInput && secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand + lazy val needsBothInputAndDemand = primaryInputs.NeedsInput && secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand override def initialTransferState = needsBothInputAndDemand override protected def transfer(): TransferState = { @@ -51,8 +51,8 @@ private[akka] class ZipImpl(_settings: GeneratorSettings, _other: Producer[Any]) private[akka] class ConcatImpl(_settings: GeneratorSettings, _other: Producer[Any]) extends TwoStreamInputProcessor(_settings, _other) { - def needsPrimaryInputAndDemandWithComplete = primaryInputs.NeedsInputOrComplete && PrimaryOutputs.NeedsDemand - def needsSecondaryInputAndDemand = secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand + lazy val needsPrimaryInputAndDemandWithComplete = primaryInputs.NeedsInputOrComplete && PrimaryOutputs.NeedsDemand + lazy val needsSecondaryInputAndDemand = secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand var processingPrimary = true override protected def initialTransferState: TransferState = needsPrimaryInputAndDemandWithComplete diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index ce416df41d..dfb38aa661 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -15,7 +15,7 @@ private[akka] object MultiStreamOutputProcessor { case class SubstreamRequestMore(substream: ActorRef, demand: Int) case class SubstreamCancel(substream: ActorRef) - class SubstreamSubscription( final val parent: ActorRef, final val substream: ActorRef) extends Subscription { + class SubstreamSubscription(val parent: ActorRef, val substream: ActorRef) extends Subscription { override def requestMore(elements: Int): Unit = if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") else parent ! SubstreamRequestMore(substream, elements) @@ -40,7 +40,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett val substream = context.watch(context.actorOf(IdentityProcessorImpl.props(settings))) val processor = new ActorProcessor[AnyRef, AnyRef](substream) - override def isComplete: Boolean = completed + override def isClosed: Boolean = completed override def complete(): Unit = { if (!completed) substream ! OnComplete completed = true @@ -52,6 +52,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett demands -= 1 substream ! OnNext(elem) } + def enqueueOutputDemand(demand: Int): Unit = demands += demand override def demandAvailable: Boolean = demands > 0 override val NeedsDemand: TransferState = new TransferState { @@ -59,7 +60,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett override def isCompleted: Boolean = completed } override val NeedsDemandOrCancel: TransferState = new TransferState { - override def isReady: Boolean = demandAvailable || isComplete + override def isReady: Boolean = demandAvailable || isClosed override def isCompleted: Boolean = false } } @@ -74,7 +75,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett protected def invalidateSubstream(substream: ActorRef): Unit = { substreamOutputs(substream).complete() substreamOutputs -= substream - if ((isShuttingDown || PrimaryOutputs.isComplete) && context.children.isEmpty) context.stop(self) + if ((isShuttingDown || PrimaryOutputs.isClosed) && context.children.isEmpty) context.stop(self) pump() } @@ -84,12 +85,10 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett } override def shutdown(completed: Boolean): Unit = { - // If the master stream is cancelled (no one consumes substreams) then this callback does not mean we are shutting down - // We can only shut down after all substreams are closed - //if (!PrimaryOutputs.isComplete) { + // If the master stream is cancelled (no one consumes substreams as elements from the master stream) + // then this callback does not mean we are shutting down + // We can only shut down after all substreams (our children) are closed if (context.children.isEmpty) super.shutdown(completed) - //} - } override def completeDownstream(): Unit = { @@ -111,7 +110,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSett * INTERNAL API */ private[akka] object TwoStreamInputProcessor { - class OtherActorSubscriber[T]( final val impl: ActorRef) extends Subscriber[T] { + class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] { override def onError(cause: Throwable): Unit = impl ! OnError(cause) override def onComplete(): Unit = impl ! OtherStreamOnComplete override def onNext(element: T): Unit = impl ! OtherStreamOnNext(element) @@ -155,7 +154,7 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: GeneratorSetting } override def flushAndComplete(): Unit = { - if (secondaryInputs.isCompleted && primaryInputs.isCompleted) + if (secondaryInputs.isClosed && primaryInputs.isClosed) super.flushAndComplete() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala index cbce375d7f..272a80df23 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -15,7 +15,8 @@ trait Inputs { def cancel(): Unit def complete(): Unit - def isCompleted: Boolean + def isClosed: Boolean + def isOpen: Boolean = !isClosed def prefetch(): Unit def clear(): Unit @@ -33,7 +34,8 @@ trait Outputs { def complete(): Unit def cancel(): Unit - def isComplete: Boolean + def isClosed: Boolean + def isOpen: Boolean = !isClosed } // States of the operation that is executed by this processor @@ -61,7 +63,7 @@ object Completed extends TransferState { object EmptyInputs extends Inputs { override def inputsAvailable: Boolean = false override def inputsDepleted: Boolean = true - override def isCompleted: Boolean = true + override def isClosed: Boolean = true override def complete(): Unit = () override def cancel(): Unit = () @@ -84,14 +86,14 @@ class BatchingInputBuffer(val upstream: Subscription, val size: Int) extends Inp private var inputBufferElements = 0 private var nextInputElementCursor = 0 private var upstreamCompleted = false - val IndexMask = size - 1 + private val IndexMask = size - 1 private def requestBatchSize = math.max(1, inputBuffer.length / 2) private var batchRemaining = requestBatchSize - def prefetch(): Unit = upstream.requestMore(inputBuffer.length) + override def prefetch(): Unit = upstream.requestMore(inputBuffer.length) - def dequeueInputElement(): Any = { + override def dequeueInputElement(): Any = { val elem = inputBuffer(nextInputElementCursor) inputBuffer(nextInputElementCursor) = null @@ -107,25 +109,26 @@ class BatchingInputBuffer(val upstream: Subscription, val size: Int) extends Inp elem } - def enqueueInputElement(elem: Any): Unit = { - inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef] - inputBufferElements += 1 - } + override def enqueueInputElement(elem: Any): Unit = + if (isOpen) { + inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef] + inputBufferElements += 1 + } - def complete(): Unit = upstreamCompleted = true - def cancel(): Unit = { + override def complete(): Unit = upstreamCompleted = true + override def cancel(): Unit = { if (!upstreamCompleted) upstream.cancel() upstreamCompleted = true } - def isCompleted: Boolean = upstreamCompleted + override def isClosed: Boolean = upstreamCompleted - def clear(): Unit = { + override def clear(): Unit = { Arrays.fill(inputBuffer, 0, inputBuffer.length, null) inputBufferElements = 0 } - def inputsDepleted = upstreamCompleted && inputBufferElements == 0 - def inputsAvailable = inputBufferElements > 0 + override def inputsDepleted = upstreamCompleted && inputBufferElements == 0 + override def inputsAvailable = inputBufferElements > 0 override val NeedsInput: TransferState = new TransferState { def isReady = inputsAvailable diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala index 4cbe897f23..b112d4e2fc 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala @@ -182,6 +182,21 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor subscription.requestMore(2) consumer.expectNoMsg(200.millis) } + + "support producing elements from empty inputs" in { + val p = Stream(List.empty[Int].iterator).toProducer(gen) + val p2 = Stream(p).transform(List(1, 2, 3))((s, _) ⇒ (s, Nil), onComplete = s ⇒ s). + toProducer(gen) + val consumer = StreamTestKit.consumerProbe[Int] + p2.produceTo(consumer) + val subscription = consumer.expectSubscription() + subscription.requestMore(4) + consumer.expectNext(1) + consumer.expectNext(2) + consumer.expectNext(3) + consumer.expectComplete() + + } } } \ No newline at end of file