+str #15088, #15200: Add takeAndTail and concatAll

This commit is contained in:
Endre Sándor Varga 2014-05-16 14:21:15 +02:00
parent 7d2186c73d
commit a99902077e
17 changed files with 581 additions and 29 deletions

View file

@ -38,7 +38,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
private var completed: Boolean = false
private var demands: Int = 0
override def receive: SubReceive =
override def subreceive: SubReceive =
throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block")
val substream = context.watch(context.actorOf(
@ -112,7 +112,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
}
override def receive = primaryInputs.subreceive orElse primaryOutputs.receive orElse substreamManagement
override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
}
/**
@ -157,7 +157,7 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
}
}
override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.receive
override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive
other.getPublisher.subscribe(new OtherActorSubscriber(self))
@ -165,4 +165,86 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
secondaryInputs.cancel()
super.shutdownHooks()
}
}
/**
* INTERNAL API
*/
private[akka] object MultiStreamInputProcessor {
case class SubstreamKey(id: Int)
class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! SubstreamOnError(key, cause)
override def onComplete(): Unit = impl ! SubstreamOnComplete(key)
override def onNext(element: T): Unit = impl ! SubstreamOnNext(key, element)
override def onSubscribe(subscription: Subscription): Unit = impl ! SubstreamStreamOnSubscribe(key, subscription)
}
case class SubstreamOnComplete(key: SubstreamKey)
case class SubstreamOnNext(key: SubstreamKey, element: Any)
case class SubstreamOnError(key: SubstreamKey, e: Throwable)
case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription)
}
/**
* INTERNAL API
*/
private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) {
import MultiStreamInputProcessor._
var nextId = 0
private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInputs]
class SubstreamInputs(val key: SubstreamKey) extends BatchingInputBuffer(settings.initialInputBufferSize, pump = this) {
// Not driven directly
override val subreceive = new SubReceive(Actor.emptyBehavior)
def substreamOnComplete(): Unit = onComplete()
def substreamOnSubscribe(subscription: Subscription): Unit = onSubscribe(subscription)
def substreamOnError(e: Throwable): Unit = onError(e)
def substreamOnNext(elem: Any): Unit = enqueueInputElement(elem)
override protected def inputOnError(e: Throwable): Unit = {
super.inputOnError(e)
invalidateSubstream(key, e)
}
}
val substreamManagement: Receive = {
case SubstreamStreamOnSubscribe(key, subscription) substreamInputs(key).substreamOnSubscribe(subscription)
case SubstreamOnNext(key, element) substreamInputs(key).substreamOnNext(element)
case SubstreamOnComplete(key) {
substreamInputs(key).substreamOnComplete()
substreamInputs -= key
}
case SubstreamOnError(key, e) substreamInputs(key).substreamOnError(e)
}
def createSubstreamInputs(p: Producer[Any]): SubstreamInputs = {
val key = SubstreamKey(nextId)
val inputs = new SubstreamInputs(key)
p.getPublisher.subscribe(new SubstreamSubscriber(self, key))
substreamInputs(key) = inputs
nextId += 1
inputs
}
protected def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = {
substreamInputs(substream).cancel()
substreamInputs -= substream
pump()
}
override def fail(e: Throwable): Unit = {
substreamInputs.values foreach (_.cancel())
super.fail(e)
}
override def shutdownHooks(): Unit = {
substreamInputs.values foreach (_.cancel())
super.shutdownHooks()
}
override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
}