+str : Various cleanups of internal streams architecture

- factor out receive blocks for inputs and outputs
 - pump uses TransferPhases and become
 - Unification of Transformer and RecoveryTransformer
This commit is contained in:
Endre Sándor Varga 2014-05-12 16:45:30 +02:00
parent e6978d494a
commit 646b376333
29 changed files with 694 additions and 855 deletions

View file

@ -4,7 +4,7 @@
package akka.stream.impl
import akka.stream.MaterializerSettings
import akka.actor.{ Terminated, ActorRef }
import akka.actor.{ Actor, Terminated, ActorRef }
import org.reactivestreams.spi.{ Subscriber, Subscription }
import org.reactivestreams.api.Producer
@ -37,8 +37,12 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
private var completed: Boolean = false
private var demands: Int = 0
val substream = context.watch(context.actorOf(IdentityProcessorImpl.props(settings).
withDispatcher(context.props.dispatcher)))
override def receive: SubReceive =
throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block")
val substream = context.watch(context.actorOf(
IdentityProcessorImpl.props(settings)
.withDispatcher(context.props.dispatcher)))
val processor = new ActorProcessor[AnyRef, AnyRef](substream)
override def isClosed: Boolean = completed
@ -76,12 +80,12 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
outputs
}
def fullyCompleted: Boolean = isShuttingDown && isPumpFinished && context.children.isEmpty
def fullyCompleted: Boolean = primaryOutputsShutdown && isPumpFinished && context.children.isEmpty
protected def invalidateSubstream(substream: ActorRef): Unit = {
substreamOutputs(substream).complete()
substreamOutputs -= substream
if (fullyCompleted) shutdown()
shutdownHooks()
pump()
}
@ -90,19 +94,15 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
super.fail(e)
}
override def primaryOutputsFinished(completed: Boolean): Unit = {
// If the master stream is cancelled (no one consumes substreams as elements from the master stream)
// then this callback does not mean we are shutting down
// We can only shut down after all substreams (our children) are closed
if (fullyCompleted) shutdown()
}
// FIXME: proper shutdown scheduling
override def shutdownHooks(): Unit = if (fullyCompleted) super.shutdownHooks()
override def pumpFinished(): Unit = {
context.children foreach (_ ! OnComplete)
super.pumpFinished()
}
override val downstreamManagement: Receive = super.downstreamManagement orElse {
val substreamManagement: Receive = {
case SubstreamRequestMore(key, demand)
substreamOutputs(key).enqueueOutputDemand(demand)
pump()
@ -110,6 +110,8 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
case Terminated(child) invalidateSubstream(child)
}
override def receive = primaryInputs.subreceive orElse primaryOutputs.receive orElse substreamManagement
}
/**
@ -135,51 +137,31 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
extends ActorProcessorImpl(_settings) {
import TwoStreamInputProcessor._
var secondaryInputs: Inputs = _
val secondaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) {
override val subreceive: SubReceive = new SubReceive(waitingForUpstream)
override def primaryOutputsReady(): Unit = {
other.getPublisher.subscribe(new OtherActorSubscriber(self))
super.primaryOutputsReady()
override def inputOnError(e: Throwable): Unit = TwoStreamInputProcessor.this.onError(e)
override def waitingForUpstream: Receive = {
case OtherStreamOnComplete onComplete()
case OtherStreamOnSubscribe(subscription) onSubscribe(subscription)
}
override def upstreamRunning: Receive = {
case OtherStreamOnNext(element) enqueueInputElement(element)
case OtherStreamOnComplete onComplete()
}
override protected def completed: Actor.Receive = {
case OtherStreamOnSubscribe(_) throw new IllegalStateException("Cannot subscribe shutdown subscriber")
}
}
override def waitingForUpstream: Receive = super.waitingForUpstream orElse {
case OtherStreamOnComplete
secondaryInputs = EmptyInputs
transitionToRunningWhenReady()
case OtherStreamOnSubscribe(subscription)
assert(subscription != null)
secondaryInputs = new BatchingInputBuffer(subscription, settings.initialInputBufferSize)
transitionToRunningWhenReady()
}
override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.receive
override def running: Receive = super.running orElse {
case OtherStreamOnNext(element)
secondaryInputs.enqueueInputElement(element)
pump()
case OtherStreamOnComplete
secondaryInputs.complete()
primaryInputOnComplete()
pump()
}
other.getPublisher.subscribe(new OtherActorSubscriber(self))
override def primaryInputOnComplete(): Unit = {
if (secondaryInputs.isClosed && primaryInputs.isClosed)
super.primaryInputOnComplete()
override def shutdownHooks(): Unit = {
secondaryInputs.cancel()
super.shutdownHooks()
}
override def transitionToRunningWhenReady(): Unit = if ((primaryInputs ne null) && (secondaryInputs ne null)) {
secondaryInputs.prefetch()
super.transitionToRunningWhenReady()
}
override def fail(cause: Throwable): Unit = {
if (secondaryInputs ne null) secondaryInputs.cancel()
super.fail(cause)
}
override def primaryOutputsFinished(completed: Boolean) {
if (secondaryInputs ne null) secondaryInputs.cancel()
super.primaryOutputsFinished(completed)
}
}