From 8265f923307c9f98fdf3daecb5de9eb2e879b82b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bjo=CC=88rn=20Antonsson?= Date: Wed, 10 Sep 2014 12:14:09 +0200 Subject: [PATCH] =str #15402 Split up MultiStream(Input|Output)Processor since SSL will use both --- .../akka/stream/impl/ConcatAllImpl.scala | 8 +- .../stream/impl/GroupByProcessorImpl.scala | 138 ++++++++-------- .../akka/stream/impl/PrefixAndTailImpl.scala | 6 +- .../stream/impl/SplitWhenProcessorImpl.scala | 14 +- .../impl/StreamOfStreamProcessors.scala | 153 ++++++++++++------ .../akka/stream/impl2/ConcatAllImpl.scala | 13 +- .../stream/impl2/GroupByProcessorImpl.scala | 145 +++++++++-------- .../akka/stream/impl2/PrefixAndTailImpl.scala | 6 +- .../stream/impl2/SplitWhenProcessorImpl.scala | 15 +- 9 files changed, 279 insertions(+), 219 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala index fccb2a018f..651e9718c9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala @@ -12,13 +12,15 @@ import akka.stream.impl.MultiStreamInputProcessor.SubstreamKey */ private[akka] class ConcatAllImpl(_settings: MaterializerSettings) extends MultiStreamInputProcessor(_settings) { + import MultiStreamInputProcessor._ + val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ val publisher = primaryInputs.dequeueInputElement().asInstanceOf[Publisher[Any]] - val inputs = createSubstreamInputs(publisher) + val inputs = createAndSubscribeSubstreamInput(publisher) nextPhase(streamSubstream(inputs)) } - def streamSubstream(substream: SubstreamInputs): TransferPhase = + def streamSubstream(substream: SubstreamInput): TransferPhase = TransferPhase(substream.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { () ⇒ if (substream.inputsDepleted) nextPhase(takeNextSubstream) else primaryOutputs.enqueueOutputElement(substream.dequeueInputElement()) @@ -26,5 +28,5 @@ private[akka] class ConcatAllImpl(_settings: MaterializerSettings) extends Multi nextPhase(takeNextSubstream) - override def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = fail(e) + override def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = fail(e) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index 6ec5287d57..eacd4e6343 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -1,68 +1,70 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.stream.impl - -import akka.actor.{ Terminated, Props, ActorRef } -import akka.stream.MaterializerSettings -import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey - -/** - * INTERNAL API - */ -private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any ⇒ Any) extends MultiStreamOutputProcessor(settings) { - var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs] - - var pendingSubstreamOutputs: SubstreamOutputs = _ - - // No substream is open yet. If downstream cancels now, we are complete - val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - val key = keyFor(elem) - nextPhase(openSubstream(elem, key)) - } - - // some substreams are open now. If downstream cancels, we still continue until the substreams are closed - val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - val key = keyFor(elem) - - keyToSubstreamOutputs.get(key) match { - case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutputs(key))) - case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) - case _ ⇒ // stay - } - } - - def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - if (primaryOutputs.isClosed) { - // Just drop, we do not open any more substreams - nextPhase(waitNext) - } else { - val substreamOutput = newSubstream() - primaryOutputs.enqueueOutputElement((key, substreamOutput)) - keyToSubstreamOutputs(key) = substreamOutput - nextPhase(dispatchToSubstream(elem, substreamOutput)) - } - } - - def dispatchToSubstream(elem: Any, substream: SubstreamOutputs): TransferPhase = { - pendingSubstreamOutputs = substream - TransferPhase(substream.NeedsDemand) { () ⇒ - substream.enqueueOutputElement(elem) - pendingSubstreamOutputs = null - nextPhase(waitNext) - } - } - - nextPhase(waitFirst) - - override def invalidateSubstream(substream: SubstreamKey): Unit = { - if ((pendingSubstreamOutputs ne null) && substream == pendingSubstreamOutputs.key) { - pendingSubstreamOutputs = null - nextPhase(waitNext) - } - super.invalidateSubstream(substream) - } - -} +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.actor.{ Terminated, Props, ActorRef } +import akka.stream.MaterializerSettings +import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey + +/** + * INTERNAL API + */ +private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any ⇒ Any) extends MultiStreamOutputProcessor(settings) { + import MultiStreamOutputProcessor._ + + var keyToSubstreamOutput = collection.mutable.Map.empty[Any, SubstreamOutput] + + var pendingSubstreamOutput: SubstreamOutput = _ + + // No substream is open yet. If downstream cancels now, we are complete + val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + nextPhase(openSubstream(elem, key)) + } + + // some substreams are open now. If downstream cancels, we still continue until the substreams are closed + val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + + keyToSubstreamOutput.get(key) match { + case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutput(key))) + case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) + case _ ⇒ // stay + } + } + + def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + if (primaryOutputs.isClosed) { + // Just drop, we do not open any more substreams + nextPhase(waitNext) + } else { + val substreamOutput = createSubstreamOutput() + primaryOutputs.enqueueOutputElement((key, substreamOutput)) + keyToSubstreamOutput(key) = substreamOutput + nextPhase(dispatchToSubstream(elem, substreamOutput)) + } + } + + def dispatchToSubstream(elem: Any, substream: SubstreamOutput): TransferPhase = { + pendingSubstreamOutput = substream + TransferPhase(substream.NeedsDemand) { () ⇒ + substream.enqueueOutputElement(elem) + pendingSubstreamOutput = null + nextPhase(waitNext) + } + } + + nextPhase(waitFirst) + + override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { + if ((pendingSubstreamOutput ne null) && substream == pendingSubstreamOutput.key) { + pendingSubstreamOutput = null + nextPhase(waitNext) + } + super.invalidateSubstreamOutput(substream) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala index 5567c15d86..a520d731cc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala @@ -12,6 +12,8 @@ import scala.collection.immutable private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeMax: Int) extends MultiStreamOutputProcessor(_settings) { + import MultiStreamOutputProcessor._ + var taken = immutable.Vector.empty[Any] var left = takeMax @@ -28,7 +30,7 @@ private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeM } } - def streamTailPhase(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ + def streamTailPhase(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ substream.enqueueOutputElement(primaryInputs.dequeueInputElement()) } @@ -43,7 +45,7 @@ private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeM } def emitNonEmptyTail(): Unit = { - val substreamOutput = newSubstream() + val substreamOutput = createSubstreamOutput() primaryOutputs.enqueueOutputElement((taken, substreamOutput)) primaryOutputs.complete() nextPhase(streamTailPhase(substreamOutput)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index 1b30da06fb..af6e952500 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -12,27 +12,29 @@ import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val splitPredicate: Any ⇒ Boolean) extends MultiStreamOutputProcessor(_settings) { - var currentSubstream: SubstreamOutputs = _ + import MultiStreamOutputProcessor._ + + var currentSubstream: SubstreamOutput = _ val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ nextPhase(openSubstream(primaryInputs.dequeueInputElement())) } def openSubstream(elem: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - val substreamOutput = newSubstream() + val substreamOutput = createSubstreamOutput() primaryOutputs.enqueueOutputElement(substreamOutput) currentSubstream = substreamOutput nextPhase(serveSubstreamFirst(currentSubstream, elem)) } // Serving the substream is split into two phases to minimize elements "held in hand" - def serveSubstreamFirst(substream: SubstreamOutputs, elem: Any) = TransferPhase(substream.NeedsDemand) { () ⇒ + def serveSubstreamFirst(substream: SubstreamOutput, elem: Any) = TransferPhase(substream.NeedsDemand) { () ⇒ substream.enqueueOutputElement(elem) nextPhase(serveSubstreamRest(substream)) } // Note that this phase is allocated only once per _slice_ and not per element - def serveSubstreamRest(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ + def serveSubstreamRest(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ val elem = primaryInputs.dequeueInputElement() if (splitPredicate(elem)) { currentSubstream.complete() @@ -49,9 +51,9 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val nextPhase(waitFirst) - override def invalidateSubstream(substream: SubstreamKey): Unit = { + override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) - super.invalidateSubstream(substream) + super.invalidateSubstreamOutput(substream) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 43c48f5603..04fea12684 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -27,26 +27,20 @@ private[akka] object MultiStreamOutputProcessor { override def toString = "SubstreamSubscription" + System.identityHashCode(this) } -} - -/** - * INTERNAL API - */ -private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) { - import MultiStreamOutputProcessor._ - private var nextId = 0 - private val substreamOutputs = mutable.Map.empty[SubstreamKey, SubstreamOutputs] - - class SubstreamOutputs(val key: SubstreamKey) extends SimpleOutputs(self, this) with Publisher[Any] { - + object SubstreamOutput { sealed trait PublisherState sealed trait CompletedState extends PublisherState case object Open extends PublisherState final case class Attached(sub: Subscriber[Any]) extends PublisherState case object Completed extends CompletedState final case class Failed(e: Throwable) extends CompletedState + } - private val subscription = new SubstreamSubscription(self, key) + class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump) extends SimpleOutputs(actor, pump) with Publisher[Any] { + + import SubstreamOutput._ + + private val subscription = new SubstreamSubscription(actor, key) private val state = new AtomicReference[PublisherState](Open) override def subreceive: SubReceive = @@ -75,7 +69,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS state.getAndSet(withState) match { case Attached(sub) ⇒ closeSubscriber(sub, withState) case _: CompletedState ⇒ throw new IllegalStateException("Attempted to double shutdown publisher") - case Open ⇒ // No action needed + case Open ⇒ // No action needed } } @@ -85,7 +79,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS } override def subscribe(s: Subscriber[Any]): Unit = { - if (state.compareAndSet(Open, Attached(s))) self ! SubstreamSubscribe(key, s) + if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) else { state.get() match { case _: Attached ⇒ s.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher")) @@ -101,38 +95,64 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS subscriber.onSubscribe(subscription) } else subscriber.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher")) } +} - protected def newSubstream(): SubstreamOutputs = { - val id = SubstreamKey(nextId) - nextId += 1 - val outputs = new SubstreamOutputs(id) +/** + * INTERNAL API + */ +private[akka] trait MultiStreamOutputProcessorLike extends Pump { this: Actor ⇒ + import MultiStreamOutputProcessor._ + + protected def nextId(): Long + + private val substreamOutputs = mutable.Map.empty[SubstreamKey, SubstreamOutput] + + protected def createSubstreamOutput(): SubstreamOutput = { + val id = SubstreamKey(nextId()) + val outputs = new SubstreamOutput(id, self, this) substreamOutputs(outputs.key) = outputs outputs } - protected def invalidateSubstream(substream: SubstreamKey): Unit = { + protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { substreamOutputs(substream).complete() substreamOutputs -= substream pump() } - override def fail(e: Throwable): Unit = { + protected def failOutputs(e: Throwable): Unit = { substreamOutputs.values foreach (_.cancel(e)) + } + + protected def finishOutputs(): Unit = { + substreamOutputs.values foreach (_.complete()) + } + + val outputSubstreamManagement: Receive = { + case SubstreamRequestMore(key, demand) ⇒ substreamOutputs(key).enqueueOutputDemand(demand) + case SubstreamCancel(key) ⇒ invalidateSubstreamOutput(key) + case SubstreamSubscribe(key, subscriber) ⇒ substreamOutputs(key).attachSubscriber(subscriber) + } +} + +/** + * INTERNAL API + */ +private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) with MultiStreamOutputProcessorLike { + private var _nextId = 0L + protected def nextId(): Long = { _nextId += 1; _nextId } + + override protected def fail(e: Throwable): Unit = { + failOutputs(e) super.fail(e) } override def pumpFinished(): Unit = { - substreamOutputs.values foreach (_.complete()) + finishOutputs() super.pumpFinished() } - val substreamManagement: Receive = { - case SubstreamRequestMore(key, demand) ⇒ substreamOutputs(key).enqueueOutputDemand(demand) - case SubstreamCancel(key) ⇒ invalidateSubstream(key) - case SubstreamSubscribe(key, subscriber) ⇒ substreamOutputs(key).attachSubscriber(subscriber) - } - - override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement + override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement } /** @@ -196,7 +216,7 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett * INTERNAL API */ private[akka] object MultiStreamInputProcessor { - case class SubstreamKey(id: Int) + case class SubstreamKey(id: Long) class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends Subscriber[T] { override def onError(cause: Throwable): Unit = impl ! SubstreamOnError(key, cause) @@ -209,18 +229,8 @@ private[akka] object MultiStreamInputProcessor { case class SubstreamOnNext(key: SubstreamKey, element: Any) case class SubstreamOnError(key: SubstreamKey, e: Throwable) case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription) -} -/** - * INTERNAL API - */ -private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) { - import akka.stream.impl.MultiStreamInputProcessor._ - var nextId = 0 - - private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInputs] - - class SubstreamInputs(val key: SubstreamKey) extends BatchingInputBuffer(settings.initialInputBufferSize, pump = this) { + class SubstreamInput(val key: SubstreamKey, bufferSize: Int, processor: MultiStreamInputProcessorLike, pump: Pump) extends BatchingInputBuffer(bufferSize, pump) { // Not driven directly override val subreceive = new SubReceive(Actor.emptyBehavior) @@ -231,11 +241,25 @@ private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSe override protected def inputOnError(e: Throwable): Unit = { super.inputOnError(e) - invalidateSubstream(key, e) + processor.invalidateSubstreamInput(key, e) } } - val substreamManagement: Receive = { +} + +/** + * INTERNAL API + */ +private[akka] trait MultiStreamInputProcessorLike extends Pump { this: Actor ⇒ + + import MultiStreamInputProcessor._ + + protected def nextId(): Long + protected def inputBufferSize: Int + + private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInput] + + val inputSubstreamManagement: Receive = { case SubstreamStreamOnSubscribe(key, subscription) ⇒ substreamInputs(key).substreamOnSubscribe(subscription) case SubstreamOnNext(key, element) ⇒ substreamInputs(key).substreamOnNext(element) case SubstreamOnComplete(key) ⇒ { @@ -246,30 +270,53 @@ private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSe } - def createSubstreamInputs(p: Publisher[Any]): SubstreamInputs = { - val key = SubstreamKey(nextId) - val inputs = new SubstreamInputs(key) - p.subscribe(new SubstreamSubscriber(self, key)) + def createSubstreamInput(): SubstreamInput = { + val key = SubstreamKey(nextId()) + val inputs = new SubstreamInput(key, inputBufferSize, this, this) substreamInputs(key) = inputs - nextId += 1 inputs } - protected def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = { + def createAndSubscribeSubstreamInput(p: Publisher[Any]): SubstreamInput = { + val inputs = createSubstreamInput() + p.subscribe(new SubstreamSubscriber(self, inputs.key)) + inputs + } + + def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = { substreamInputs(substream).cancel() substreamInputs -= substream pump() } - override def fail(e: Throwable): Unit = { + protected def failInputs(e: Throwable): Unit = { substreamInputs.values foreach (_.cancel()) + } + + protected def finishInputs(): Unit = { + substreamInputs.values foreach (_.cancel()) + } + +} + +/** + * INTERNAL API + */ +private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) with MultiStreamInputProcessorLike { + private var _nextId = 0L + protected def nextId(): Long = { _nextId += 1; _nextId } + + override protected val inputBufferSize = _settings.initialInputBufferSize + + override protected def fail(e: Throwable) = { + failInputs(e) super.fail(e) } - override def pumpFinished(): Unit = { - substreamInputs.values foreach (_.cancel()) + override def pumpFinished() = { + finishInputs() super.pumpFinished() } - override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement + override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse inputSubstreamManagement } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl2/ConcatAllImpl.scala index 1f9f17ef76..f493951d09 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ConcatAllImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ConcatAllImpl.scala @@ -3,9 +3,6 @@ */ package akka.stream.impl2 -import akka.stream.MaterializerSettings -import org.reactivestreams.Publisher -import akka.stream.impl.MultiStreamInputProcessor.SubstreamKey import akka.stream.impl.TransferPhase import akka.stream.impl.MultiStreamInputProcessor import akka.stream.scaladsl2.FlowWithSource @@ -17,15 +14,17 @@ import akka.stream.scaladsl2.FlowMaterializer private[akka] class ConcatAllImpl(materializer: FlowMaterializer) extends MultiStreamInputProcessor(materializer.settings) { + import MultiStreamInputProcessor._ + val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ val flow = primaryInputs.dequeueInputElement().asInstanceOf[FlowWithSource[Any, Any]] val publisher = flow.toPublisher()(materializer) - // FIXME we can pass the flow to createSubstreamInputs (but avoiding copy impl now) - val inputs = createSubstreamInputs(publisher) + // FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now) + val inputs = createAndSubscribeSubstreamInput(publisher) nextPhase(streamSubstream(inputs)) } - def streamSubstream(substream: SubstreamInputs): TransferPhase = + def streamSubstream(substream: SubstreamInput): TransferPhase = TransferPhase(substream.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { () ⇒ if (substream.inputsDepleted) nextPhase(takeNextSubstream) else primaryOutputs.enqueueOutputElement(substream.dequeueInputElement()) @@ -33,5 +32,5 @@ private[akka] class ConcatAllImpl(materializer: FlowMaterializer) nextPhase(takeNextSubstream) - override def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = fail(e) + override def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = fail(e) } diff --git a/akka-stream/src/main/scala/akka/stream/impl2/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl2/GroupByProcessorImpl.scala index a6babacbcd..6a9cbd9d8f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/GroupByProcessorImpl.scala @@ -1,71 +1,74 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.stream.impl2 - -import akka.stream.MaterializerSettings -import akka.stream.impl.TransferPhase -import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey -import akka.stream.scaladsl2.FlowFrom -import akka.stream.impl.MultiStreamOutputProcessor - -/** - * INTERNAL API - */ -private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any ⇒ Any) extends MultiStreamOutputProcessor(settings) { - var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs] - - var pendingSubstreamOutputs: SubstreamOutputs = _ - - // No substream is open yet. If downstream cancels now, we are complete - val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - val key = keyFor(elem) - nextPhase(openSubstream(elem, key)) - } - - // some substreams are open now. If downstream cancels, we still continue until the substreams are closed - val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - val key = keyFor(elem) - - keyToSubstreamOutputs.get(key) match { - case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutputs(key))) - case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) - case _ ⇒ // stay - } - } - - def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - if (primaryOutputs.isClosed) { - // Just drop, we do not open any more substreams - nextPhase(waitNext) - } else { - val substreamOutput = newSubstream() - val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher - primaryOutputs.enqueueOutputElement((key, substreamFlow)) - keyToSubstreamOutputs(key) = substreamOutput - nextPhase(dispatchToSubstream(elem, substreamOutput)) - } - } - - def dispatchToSubstream(elem: Any, substream: SubstreamOutputs): TransferPhase = { - pendingSubstreamOutputs = substream - TransferPhase(substream.NeedsDemand) { () ⇒ - substream.enqueueOutputElement(elem) - pendingSubstreamOutputs = null - nextPhase(waitNext) - } - } - - nextPhase(waitFirst) - - override def invalidateSubstream(substream: SubstreamKey): Unit = { - if ((pendingSubstreamOutputs ne null) && substream == pendingSubstreamOutputs.key) { - pendingSubstreamOutputs = null - nextPhase(waitNext) - } - super.invalidateSubstream(substream) - } - -} +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl2 + +import akka.stream.MaterializerSettings +import akka.stream.impl.TransferPhase +import akka.stream.scaladsl2.FlowFrom +import akka.stream.impl.MultiStreamOutputProcessor + +/** + * INTERNAL API + */ +private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any ⇒ Any) + extends MultiStreamOutputProcessor(settings) { + + import MultiStreamOutputProcessor._ + + var keyToSubstreamOutput = collection.mutable.Map.empty[Any, SubstreamOutput] + + var pendingSubstreamOutput: SubstreamOutput = _ + + // No substream is open yet. If downstream cancels now, we are complete + val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + nextPhase(openSubstream(elem, key)) + } + + // some substreams are open now. If downstream cancels, we still continue until the substreams are closed + val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + + keyToSubstreamOutput.get(key) match { + case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutput(key))) + case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) + case _ ⇒ // stay + } + } + + def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + if (primaryOutputs.isClosed) { + // Just drop, we do not open any more substreams + nextPhase(waitNext) + } else { + val substreamOutput = createSubstreamOutput() + val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher + primaryOutputs.enqueueOutputElement((key, substreamFlow)) + keyToSubstreamOutput(key) = substreamOutput + nextPhase(dispatchToSubstream(elem, substreamOutput)) + } + } + + def dispatchToSubstream(elem: Any, substream: SubstreamOutput): TransferPhase = { + pendingSubstreamOutput = substream + TransferPhase(substream.NeedsDemand) { () ⇒ + substream.enqueueOutputElement(elem) + pendingSubstreamOutput = null + nextPhase(waitNext) + } + } + + nextPhase(waitFirst) + + override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { + if ((pendingSubstreamOutput ne null) && substream == pendingSubstreamOutput.key) { + pendingSubstreamOutput = null + nextPhase(waitNext) + } + super.invalidateSubstreamOutput(substream) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl2/PrefixAndTailImpl.scala b/akka-stream/src/main/scala/akka/stream/impl2/PrefixAndTailImpl.scala index 113648d80b..55de49ad50 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/PrefixAndTailImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/PrefixAndTailImpl.scala @@ -16,6 +16,8 @@ import akka.stream.scaladsl2.FlowFrom private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeMax: Int) extends MultiStreamOutputProcessor(_settings) { + import MultiStreamOutputProcessor._ + var taken = immutable.Vector.empty[Any] var left = takeMax @@ -32,7 +34,7 @@ private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeM } } - def streamTailPhase(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ + def streamTailPhase(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ substream.enqueueOutputElement(primaryInputs.dequeueInputElement()) } @@ -47,7 +49,7 @@ private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeM } def emitNonEmptyTail(): Unit = { - val substreamOutput = newSubstream() + val substreamOutput = createSubstreamOutput() val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher primaryOutputs.enqueueOutputElement((taken, substreamFlow)) primaryOutputs.complete() diff --git a/akka-stream/src/main/scala/akka/stream/impl2/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl2/SplitWhenProcessorImpl.scala index 757338840b..39e5681152 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/SplitWhenProcessorImpl.scala @@ -4,7 +4,6 @@ package akka.stream.impl2 import akka.stream.MaterializerSettings -import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey import akka.stream.impl.TransferPhase import akka.stream.impl.MultiStreamOutputProcessor import akka.stream.scaladsl2.FlowFrom @@ -15,14 +14,16 @@ import akka.stream.scaladsl2.FlowFrom private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val splitPredicate: Any ⇒ Boolean) extends MultiStreamOutputProcessor(_settings) { - var currentSubstream: SubstreamOutputs = _ + import MultiStreamOutputProcessor._ + + var currentSubstream: SubstreamOutput = _ val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ nextPhase(openSubstream(primaryInputs.dequeueInputElement())) } def openSubstream(elem: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - val substreamOutput = newSubstream() + val substreamOutput = createSubstreamOutput() val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher primaryOutputs.enqueueOutputElement(substreamFlow) currentSubstream = substreamOutput @@ -30,13 +31,13 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val } // Serving the substream is split into two phases to minimize elements "held in hand" - def serveSubstreamFirst(substream: SubstreamOutputs, elem: Any) = TransferPhase(substream.NeedsDemand) { () ⇒ + def serveSubstreamFirst(substream: SubstreamOutput, elem: Any) = TransferPhase(substream.NeedsDemand) { () ⇒ substream.enqueueOutputElement(elem) nextPhase(serveSubstreamRest(substream)) } // Note that this phase is allocated only once per _slice_ and not per element - def serveSubstreamRest(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ + def serveSubstreamRest(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ val elem = primaryInputs.dequeueInputElement() if (splitPredicate(elem)) { currentSubstream.complete() @@ -53,9 +54,9 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val nextPhase(waitFirst) - override def invalidateSubstream(substream: SubstreamKey): Unit = { + override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) - super.invalidateSubstream(substream) + super.invalidateSubstreamOutput(substream) } }