=str #15402 Split up MultiStream(Input|Output)Processor since SSL will use both

This commit is contained in:
Björn Antonsson 2014-09-10 12:14:09 +02:00
parent 076cdaccbb
commit 8265f92330
9 changed files with 279 additions and 219 deletions

View file

@ -12,13 +12,15 @@ import akka.stream.impl.MultiStreamInputProcessor.SubstreamKey
*/
private[akka] class ConcatAllImpl(_settings: MaterializerSettings) extends MultiStreamInputProcessor(_settings) {
import MultiStreamInputProcessor._
val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val publisher = primaryInputs.dequeueInputElement().asInstanceOf[Publisher[Any]]
val inputs = createSubstreamInputs(publisher)
val inputs = createAndSubscribeSubstreamInput(publisher)
nextPhase(streamSubstream(inputs))
}
def streamSubstream(substream: SubstreamInputs): TransferPhase =
def streamSubstream(substream: SubstreamInput): TransferPhase =
TransferPhase(substream.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { ()
if (substream.inputsDepleted) nextPhase(takeNextSubstream)
else primaryOutputs.enqueueOutputElement(substream.dequeueInputElement())
@ -26,5 +28,5 @@ private[akka] class ConcatAllImpl(_settings: MaterializerSettings) extends Multi
nextPhase(takeNextSubstream)
override def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = fail(e)
override def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = fail(e)
}

View file

@ -11,9 +11,11 @@ import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey
* INTERNAL API
*/
private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any Any) extends MultiStreamOutputProcessor(settings) {
var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs]
import MultiStreamOutputProcessor._
var pendingSubstreamOutputs: SubstreamOutputs = _
var keyToSubstreamOutput = collection.mutable.Map.empty[Any, SubstreamOutput]
var pendingSubstreamOutput: SubstreamOutput = _
// No substream is open yet. If downstream cancels now, we are complete
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
@ -27,8 +29,8 @@ private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val key
val elem = primaryInputs.dequeueInputElement()
val key = keyFor(elem)
keyToSubstreamOutputs.get(key) match {
case Some(substream) if substream.isOpen nextPhase(dispatchToSubstream(elem, keyToSubstreamOutputs(key)))
keyToSubstreamOutput.get(key) match {
case Some(substream) if substream.isOpen nextPhase(dispatchToSubstream(elem, keyToSubstreamOutput(key)))
case None if primaryOutputs.isOpen nextPhase(openSubstream(elem, key))
case _ // stay
}
@ -39,30 +41,30 @@ private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val key
// Just drop, we do not open any more substreams
nextPhase(waitNext)
} else {
val substreamOutput = newSubstream()
val substreamOutput = createSubstreamOutput()
primaryOutputs.enqueueOutputElement((key, substreamOutput))
keyToSubstreamOutputs(key) = substreamOutput
keyToSubstreamOutput(key) = substreamOutput
nextPhase(dispatchToSubstream(elem, substreamOutput))
}
}
def dispatchToSubstream(elem: Any, substream: SubstreamOutputs): TransferPhase = {
pendingSubstreamOutputs = substream
def dispatchToSubstream(elem: Any, substream: SubstreamOutput): TransferPhase = {
pendingSubstreamOutput = substream
TransferPhase(substream.NeedsDemand) { ()
substream.enqueueOutputElement(elem)
pendingSubstreamOutputs = null
pendingSubstreamOutput = null
nextPhase(waitNext)
}
}
nextPhase(waitFirst)
override def invalidateSubstream(substream: SubstreamKey): Unit = {
if ((pendingSubstreamOutputs ne null) && substream == pendingSubstreamOutputs.key) {
pendingSubstreamOutputs = null
override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
if ((pendingSubstreamOutput ne null) && substream == pendingSubstreamOutput.key) {
pendingSubstreamOutput = null
nextPhase(waitNext)
}
super.invalidateSubstream(substream)
super.invalidateSubstreamOutput(substream)
}
}

View file

@ -12,6 +12,8 @@ import scala.collection.immutable
private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeMax: Int)
extends MultiStreamOutputProcessor(_settings) {
import MultiStreamOutputProcessor._
var taken = immutable.Vector.empty[Any]
var left = takeMax
@ -28,7 +30,7 @@ private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeM
}
}
def streamTailPhase(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
def streamTailPhase(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
substream.enqueueOutputElement(primaryInputs.dequeueInputElement())
}
@ -43,7 +45,7 @@ private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeM
}
def emitNonEmptyTail(): Unit = {
val substreamOutput = newSubstream()
val substreamOutput = createSubstreamOutput()
primaryOutputs.enqueueOutputElement((taken, substreamOutput))
primaryOutputs.complete()
nextPhase(streamTailPhase(substreamOutput))

View file

@ -12,27 +12,29 @@ import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey
private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val splitPredicate: Any Boolean)
extends MultiStreamOutputProcessor(_settings) {
var currentSubstream: SubstreamOutputs = _
import MultiStreamOutputProcessor._
var currentSubstream: SubstreamOutput = _
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
nextPhase(openSubstream(primaryInputs.dequeueInputElement()))
}
def openSubstream(elem: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { ()
val substreamOutput = newSubstream()
val substreamOutput = createSubstreamOutput()
primaryOutputs.enqueueOutputElement(substreamOutput)
currentSubstream = substreamOutput
nextPhase(serveSubstreamFirst(currentSubstream, elem))
}
// Serving the substream is split into two phases to minimize elements "held in hand"
def serveSubstreamFirst(substream: SubstreamOutputs, elem: Any) = TransferPhase(substream.NeedsDemand) { ()
def serveSubstreamFirst(substream: SubstreamOutput, elem: Any) = TransferPhase(substream.NeedsDemand) { ()
substream.enqueueOutputElement(elem)
nextPhase(serveSubstreamRest(substream))
}
// Note that this phase is allocated only once per _slice_ and not per element
def serveSubstreamRest(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
def serveSubstreamRest(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
val elem = primaryInputs.dequeueInputElement()
if (splitPredicate(elem)) {
currentSubstream.complete()
@ -49,9 +51,9 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val
nextPhase(waitFirst)
override def invalidateSubstream(substream: SubstreamKey): Unit = {
override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream)
super.invalidateSubstream(substream)
super.invalidateSubstreamOutput(substream)
}
}

View file

@ -27,26 +27,20 @@ private[akka] object MultiStreamOutputProcessor {
override def toString = "SubstreamSubscription" + System.identityHashCode(this)
}
}
/**
* INTERNAL API
*/
private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) {
import MultiStreamOutputProcessor._
private var nextId = 0
private val substreamOutputs = mutable.Map.empty[SubstreamKey, SubstreamOutputs]
class SubstreamOutputs(val key: SubstreamKey) extends SimpleOutputs(self, this) with Publisher[Any] {
object SubstreamOutput {
sealed trait PublisherState
sealed trait CompletedState extends PublisherState
case object Open extends PublisherState
final case class Attached(sub: Subscriber[Any]) extends PublisherState
case object Completed extends CompletedState
final case class Failed(e: Throwable) extends CompletedState
}
private val subscription = new SubstreamSubscription(self, key)
class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump) extends SimpleOutputs(actor, pump) with Publisher[Any] {
import SubstreamOutput._
private val subscription = new SubstreamSubscription(actor, key)
private val state = new AtomicReference[PublisherState](Open)
override def subreceive: SubReceive =
@ -85,7 +79,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
}
override def subscribe(s: Subscriber[Any]): Unit = {
if (state.compareAndSet(Open, Attached(s))) self ! SubstreamSubscribe(key, s)
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached s.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher"))
@ -101,38 +95,64 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
subscriber.onSubscribe(subscription)
} else subscriber.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher"))
}
}
protected def newSubstream(): SubstreamOutputs = {
val id = SubstreamKey(nextId)
nextId += 1
val outputs = new SubstreamOutputs(id)
/**
* INTERNAL API
*/
private[akka] trait MultiStreamOutputProcessorLike extends Pump { this: Actor
import MultiStreamOutputProcessor._
protected def nextId(): Long
private val substreamOutputs = mutable.Map.empty[SubstreamKey, SubstreamOutput]
protected def createSubstreamOutput(): SubstreamOutput = {
val id = SubstreamKey(nextId())
val outputs = new SubstreamOutput(id, self, this)
substreamOutputs(outputs.key) = outputs
outputs
}
protected def invalidateSubstream(substream: SubstreamKey): Unit = {
protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
substreamOutputs(substream).complete()
substreamOutputs -= substream
pump()
}
override def fail(e: Throwable): Unit = {
protected def failOutputs(e: Throwable): Unit = {
substreamOutputs.values foreach (_.cancel(e))
}
protected def finishOutputs(): Unit = {
substreamOutputs.values foreach (_.complete())
}
val outputSubstreamManagement: Receive = {
case SubstreamRequestMore(key, demand) substreamOutputs(key).enqueueOutputDemand(demand)
case SubstreamCancel(key) invalidateSubstreamOutput(key)
case SubstreamSubscribe(key, subscriber) substreamOutputs(key).attachSubscriber(subscriber)
}
}
/**
* INTERNAL API
*/
private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) with MultiStreamOutputProcessorLike {
private var _nextId = 0L
protected def nextId(): Long = { _nextId += 1; _nextId }
override protected def fail(e: Throwable): Unit = {
failOutputs(e)
super.fail(e)
}
override def pumpFinished(): Unit = {
substreamOutputs.values foreach (_.complete())
finishOutputs()
super.pumpFinished()
}
val substreamManagement: Receive = {
case SubstreamRequestMore(key, demand) substreamOutputs(key).enqueueOutputDemand(demand)
case SubstreamCancel(key) invalidateSubstream(key)
case SubstreamSubscribe(key, subscriber) substreamOutputs(key).attachSubscriber(subscriber)
}
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement
}
/**
@ -196,7 +216,7 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
* INTERNAL API
*/
private[akka] object MultiStreamInputProcessor {
case class SubstreamKey(id: Int)
case class SubstreamKey(id: Long)
class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! SubstreamOnError(key, cause)
@ -209,18 +229,8 @@ private[akka] object MultiStreamInputProcessor {
case class SubstreamOnNext(key: SubstreamKey, element: Any)
case class SubstreamOnError(key: SubstreamKey, e: Throwable)
case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription)
}
/**
* INTERNAL API
*/
private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) {
import akka.stream.impl.MultiStreamInputProcessor._
var nextId = 0
private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInputs]
class SubstreamInputs(val key: SubstreamKey) extends BatchingInputBuffer(settings.initialInputBufferSize, pump = this) {
class SubstreamInput(val key: SubstreamKey, bufferSize: Int, processor: MultiStreamInputProcessorLike, pump: Pump) extends BatchingInputBuffer(bufferSize, pump) {
// Not driven directly
override val subreceive = new SubReceive(Actor.emptyBehavior)
@ -231,11 +241,25 @@ private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSe
override protected def inputOnError(e: Throwable): Unit = {
super.inputOnError(e)
invalidateSubstream(key, e)
processor.invalidateSubstreamInput(key, e)
}
}
val substreamManagement: Receive = {
}
/**
* INTERNAL API
*/
private[akka] trait MultiStreamInputProcessorLike extends Pump { this: Actor
import MultiStreamInputProcessor._
protected def nextId(): Long
protected def inputBufferSize: Int
private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInput]
val inputSubstreamManagement: Receive = {
case SubstreamStreamOnSubscribe(key, subscription) substreamInputs(key).substreamOnSubscribe(subscription)
case SubstreamOnNext(key, element) substreamInputs(key).substreamOnNext(element)
case SubstreamOnComplete(key) {
@ -246,30 +270,53 @@ private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSe
}
def createSubstreamInputs(p: Publisher[Any]): SubstreamInputs = {
val key = SubstreamKey(nextId)
val inputs = new SubstreamInputs(key)
p.subscribe(new SubstreamSubscriber(self, key))
def createSubstreamInput(): SubstreamInput = {
val key = SubstreamKey(nextId())
val inputs = new SubstreamInput(key, inputBufferSize, this, this)
substreamInputs(key) = inputs
nextId += 1
inputs
}
protected def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = {
def createAndSubscribeSubstreamInput(p: Publisher[Any]): SubstreamInput = {
val inputs = createSubstreamInput()
p.subscribe(new SubstreamSubscriber(self, inputs.key))
inputs
}
def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = {
substreamInputs(substream).cancel()
substreamInputs -= substream
pump()
}
override def fail(e: Throwable): Unit = {
protected def failInputs(e: Throwable): Unit = {
substreamInputs.values foreach (_.cancel())
}
protected def finishInputs(): Unit = {
substreamInputs.values foreach (_.cancel())
}
}
/**
* INTERNAL API
*/
private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) with MultiStreamInputProcessorLike {
private var _nextId = 0L
protected def nextId(): Long = { _nextId += 1; _nextId }
override protected val inputBufferSize = _settings.initialInputBufferSize
override protected def fail(e: Throwable) = {
failInputs(e)
super.fail(e)
}
override def pumpFinished(): Unit = {
substreamInputs.values foreach (_.cancel())
override def pumpFinished() = {
finishInputs()
super.pumpFinished()
}
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse inputSubstreamManagement
}

View file

@ -3,9 +3,6 @@
*/
package akka.stream.impl2
import akka.stream.MaterializerSettings
import org.reactivestreams.Publisher
import akka.stream.impl.MultiStreamInputProcessor.SubstreamKey
import akka.stream.impl.TransferPhase
import akka.stream.impl.MultiStreamInputProcessor
import akka.stream.scaladsl2.FlowWithSource
@ -17,15 +14,17 @@ import akka.stream.scaladsl2.FlowMaterializer
private[akka] class ConcatAllImpl(materializer: FlowMaterializer)
extends MultiStreamInputProcessor(materializer.settings) {
import MultiStreamInputProcessor._
val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val flow = primaryInputs.dequeueInputElement().asInstanceOf[FlowWithSource[Any, Any]]
val publisher = flow.toPublisher()(materializer)
// FIXME we can pass the flow to createSubstreamInputs (but avoiding copy impl now)
val inputs = createSubstreamInputs(publisher)
// FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now)
val inputs = createAndSubscribeSubstreamInput(publisher)
nextPhase(streamSubstream(inputs))
}
def streamSubstream(substream: SubstreamInputs): TransferPhase =
def streamSubstream(substream: SubstreamInput): TransferPhase =
TransferPhase(substream.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { ()
if (substream.inputsDepleted) nextPhase(takeNextSubstream)
else primaryOutputs.enqueueOutputElement(substream.dequeueInputElement())
@ -33,5 +32,5 @@ private[akka] class ConcatAllImpl(materializer: FlowMaterializer)
nextPhase(takeNextSubstream)
override def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = fail(e)
override def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = fail(e)
}

View file

@ -5,17 +5,20 @@ package akka.stream.impl2
import akka.stream.MaterializerSettings
import akka.stream.impl.TransferPhase
import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey
import akka.stream.scaladsl2.FlowFrom
import akka.stream.impl.MultiStreamOutputProcessor
/**
* INTERNAL API
*/
private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any Any) extends MultiStreamOutputProcessor(settings) {
var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs]
private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any Any)
extends MultiStreamOutputProcessor(settings) {
var pendingSubstreamOutputs: SubstreamOutputs = _
import MultiStreamOutputProcessor._
var keyToSubstreamOutput = collection.mutable.Map.empty[Any, SubstreamOutput]
var pendingSubstreamOutput: SubstreamOutput = _
// No substream is open yet. If downstream cancels now, we are complete
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
@ -29,8 +32,8 @@ private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val key
val elem = primaryInputs.dequeueInputElement()
val key = keyFor(elem)
keyToSubstreamOutputs.get(key) match {
case Some(substream) if substream.isOpen nextPhase(dispatchToSubstream(elem, keyToSubstreamOutputs(key)))
keyToSubstreamOutput.get(key) match {
case Some(substream) if substream.isOpen nextPhase(dispatchToSubstream(elem, keyToSubstreamOutput(key)))
case None if primaryOutputs.isOpen nextPhase(openSubstream(elem, key))
case _ // stay
}
@ -41,31 +44,31 @@ private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val key
// Just drop, we do not open any more substreams
nextPhase(waitNext)
} else {
val substreamOutput = newSubstream()
val substreamOutput = createSubstreamOutput()
val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher
primaryOutputs.enqueueOutputElement((key, substreamFlow))
keyToSubstreamOutputs(key) = substreamOutput
keyToSubstreamOutput(key) = substreamOutput
nextPhase(dispatchToSubstream(elem, substreamOutput))
}
}
def dispatchToSubstream(elem: Any, substream: SubstreamOutputs): TransferPhase = {
pendingSubstreamOutputs = substream
def dispatchToSubstream(elem: Any, substream: SubstreamOutput): TransferPhase = {
pendingSubstreamOutput = substream
TransferPhase(substream.NeedsDemand) { ()
substream.enqueueOutputElement(elem)
pendingSubstreamOutputs = null
pendingSubstreamOutput = null
nextPhase(waitNext)
}
}
nextPhase(waitFirst)
override def invalidateSubstream(substream: SubstreamKey): Unit = {
if ((pendingSubstreamOutputs ne null) && substream == pendingSubstreamOutputs.key) {
pendingSubstreamOutputs = null
override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
if ((pendingSubstreamOutput ne null) && substream == pendingSubstreamOutput.key) {
pendingSubstreamOutput = null
nextPhase(waitNext)
}
super.invalidateSubstream(substream)
super.invalidateSubstreamOutput(substream)
}
}

View file

@ -16,6 +16,8 @@ import akka.stream.scaladsl2.FlowFrom
private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeMax: Int)
extends MultiStreamOutputProcessor(_settings) {
import MultiStreamOutputProcessor._
var taken = immutable.Vector.empty[Any]
var left = takeMax
@ -32,7 +34,7 @@ private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeM
}
}
def streamTailPhase(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
def streamTailPhase(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
substream.enqueueOutputElement(primaryInputs.dequeueInputElement())
}
@ -47,7 +49,7 @@ private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeM
}
def emitNonEmptyTail(): Unit = {
val substreamOutput = newSubstream()
val substreamOutput = createSubstreamOutput()
val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher
primaryOutputs.enqueueOutputElement((taken, substreamFlow))
primaryOutputs.complete()

View file

@ -4,7 +4,6 @@
package akka.stream.impl2
import akka.stream.MaterializerSettings
import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey
import akka.stream.impl.TransferPhase
import akka.stream.impl.MultiStreamOutputProcessor
import akka.stream.scaladsl2.FlowFrom
@ -15,14 +14,16 @@ import akka.stream.scaladsl2.FlowFrom
private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val splitPredicate: Any Boolean)
extends MultiStreamOutputProcessor(_settings) {
var currentSubstream: SubstreamOutputs = _
import MultiStreamOutputProcessor._
var currentSubstream: SubstreamOutput = _
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
nextPhase(openSubstream(primaryInputs.dequeueInputElement()))
}
def openSubstream(elem: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { ()
val substreamOutput = newSubstream()
val substreamOutput = createSubstreamOutput()
val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher
primaryOutputs.enqueueOutputElement(substreamFlow)
currentSubstream = substreamOutput
@ -30,13 +31,13 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val
}
// Serving the substream is split into two phases to minimize elements "held in hand"
def serveSubstreamFirst(substream: SubstreamOutputs, elem: Any) = TransferPhase(substream.NeedsDemand) { ()
def serveSubstreamFirst(substream: SubstreamOutput, elem: Any) = TransferPhase(substream.NeedsDemand) { ()
substream.enqueueOutputElement(elem)
nextPhase(serveSubstreamRest(substream))
}
// Note that this phase is allocated only once per _slice_ and not per element
def serveSubstreamRest(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
def serveSubstreamRest(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
val elem = primaryInputs.dequeueInputElement()
if (splitPredicate(elem)) {
currentSubstream.complete()
@ -53,9 +54,9 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val
nextPhase(waitFirst)
override def invalidateSubstream(substream: SubstreamKey): Unit = {
override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream)
super.invalidateSubstream(substream)
super.invalidateSubstreamOutput(substream)
}
}