!str #15604: Removal of default fanout behavior

This commit is contained in:
Endre Sándor Varga 2014-09-01 13:30:15 +02:00
parent 72080a7cc2
commit 3a60ed96d1
26 changed files with 408 additions and 344 deletions

View file

@ -3,25 +3,27 @@
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ Actor, ActorRef }
import akka.stream.MaterializerSettings
import akka.actor.{ Actor, Terminated, ActorRef }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete }
import akka.actor.Stash
import scala.collection.mutable
/**
* INTERNAL API
*/
private[akka] object MultiStreamOutputProcessor {
case class SubstreamRequestMore(substream: ActorRef, demand: Int)
case class SubstreamCancel(substream: ActorRef)
case class SubstreamKey(id: Long)
case class SubstreamRequestMore(substream: SubstreamKey, demand: Int)
case class SubstreamCancel(substream: SubstreamKey)
case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any])
class SubstreamSubscription(val parent: ActorRef, val substream: ActorRef) extends Subscription {
class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription {
override def request(elements: Int): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0")
else parent ! SubstreamRequestMore(substream, elements)
override def cancel(): Unit = parent ! SubstreamCancel(substream)
else parent ! SubstreamRequestMore(substreamKey, elements)
override def cancel(): Unit = parent ! SubstreamCancel(substreamKey)
override def toString = "SubstreamSubscription" + System.identityHashCode(this)
}
@ -32,62 +34,85 @@ private[akka] object MultiStreamOutputProcessor {
*/
private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) {
import MultiStreamOutputProcessor._
private var nextId = 0
private val substreamOutputs = mutable.Map.empty[SubstreamKey, SubstreamOutputs]
private val substreamOutputs = collection.mutable.Map.empty[ActorRef, SubstreamOutputs]
class SubstreamOutputs(val key: SubstreamKey) extends SimpleOutputs(self, this) with Publisher[Any] {
class SubstreamOutputs extends Outputs {
private var completed: Boolean = false
private var demands: Int = 0
sealed trait PublisherState
sealed trait CompletedState extends PublisherState
case object Open extends PublisherState
final case class Attached(sub: Subscriber[Any]) extends PublisherState
case object Completed extends CompletedState
final case class Failed(e: Throwable) extends CompletedState
private val subscription = new SubstreamSubscription(self, key)
private val state = new AtomicReference[PublisherState](Open)
override def subreceive: SubReceive =
throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block")
val substream = context.watch(context.actorOf(
IdentityProcessorImpl.props(settings)
.withDispatcher(context.props.dispatcher)))
val processor = ActorProcessor[AnyRef, AnyRef](substream)
override def isClosed: Boolean = completed
override def complete(): Unit = {
if (!completed) substream ! OnComplete
completed = true
def enqueueOutputDemand(demand: Int): Unit = {
downstreamDemand += demand
pump.pump()
}
override def cancel(e: Throwable): Unit = {
if (!completed) substream ! OnError(e)
completed = true
if (!downstreamCompleted) {
closePublisher(Failed(e))
downstreamCompleted = true
}
}
override def enqueueOutputElement(elem: Any): Unit = {
demands -= 1
substream ! OnNext(elem)
override def complete(): Unit = {
if (!downstreamCompleted) {
closePublisher(Completed)
downstreamCompleted = true
}
}
def enqueueOutputDemand(demand: Int): Unit = demands += demand
override def demandAvailable: Boolean = demands > 0
override val NeedsDemand: TransferState = new TransferState {
override def isReady: Boolean = demandAvailable
override def isCompleted: Boolean = completed
private def closePublisher(withState: CompletedState): Unit = {
state.getAndSet(withState) match {
case Attached(sub) closeSubscriber(sub, withState)
case _: CompletedState throw new IllegalStateException("Attempted to double shutdown publisher")
case Open // No action needed
}
}
override val NeedsDemandOrCancel: TransferState = new TransferState {
override def isReady: Boolean = demandAvailable || isClosed
override def isCompleted: Boolean = false
private def closeSubscriber(s: Subscriber[Any], withState: CompletedState): Unit = withState match {
case Completed s.onComplete()
case Failed(e) s.onError(e)
}
override def subscribe(s: Subscriber[Any]): Unit = {
if (state.compareAndSet(Open, Attached(s))) self ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached s.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher"))
case c: CompletedState closeSubscriber(s, c)
case Open throw new IllegalStateException("Publisher cannot become open after being used before")
}
}
}
def attachSubscriber(s: Subscriber[Any]): Unit =
if (subscriber eq null) {
subscriber = s
subscriber.onSubscribe(subscription)
} else subscriber.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher"))
}
protected def newSubstream(): SubstreamOutputs = {
val outputs = new SubstreamOutputs
outputs.substream ! OnSubscribe(new SubstreamSubscription(self, outputs.substream))
substreamOutputs(outputs.substream) = outputs
val id = SubstreamKey(nextId)
nextId += 1
val outputs = new SubstreamOutputs(id)
substreamOutputs(outputs.key) = outputs
outputs
}
def fullyCompleted: Boolean = primaryOutputsShutdown && isPumpFinished && context.children.isEmpty
protected def invalidateSubstream(substream: ActorRef): Unit = {
protected def invalidateSubstream(substream: SubstreamKey): Unit = {
substreamOutputs(substream).complete()
substreamOutputs -= substream
shutdownHooks()
pump()
}
@ -96,21 +121,15 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
super.fail(e)
}
// FIXME: proper shutdown scheduling
override def shutdownHooks(): Unit = if (fullyCompleted) super.shutdownHooks()
override def pumpFinished(): Unit = {
context.children foreach (_ ! OnComplete)
substreamOutputs.values foreach (_.complete())
super.pumpFinished()
}
val substreamManagement: Receive = {
case SubstreamRequestMore(key, demand)
substreamOutputs(key).enqueueOutputDemand(demand)
pump()
case SubstreamCancel(key) // FIXME: Terminated should handle this case. Maybe remove SubstreamCancel and just Poison self?
case Terminated(child) invalidateSubstream(child)
case SubstreamRequestMore(key, demand) substreamOutputs(key).enqueueOutputDemand(demand)
case SubstreamCancel(key) invalidateSubstream(key)
case SubstreamSubscribe(key, subscriber) substreamOutputs(key).attachSubscriber(subscriber)
}
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
@ -138,7 +157,7 @@ private[akka] object TwoStreamInputProcessor {
*/
private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSettings, val other: Publisher[Any])
extends ActorProcessorImpl(_settings) {
import TwoStreamInputProcessor._
import akka.stream.impl.TwoStreamInputProcessor._
val secondaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) {
override val subreceive: SubReceive = new SubReceive(waitingForUpstream)
@ -166,10 +185,11 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
other.subscribe(new OtherActorSubscriber(self))
override def shutdownHooks(): Unit = {
override def pumpFinished(): Unit = {
secondaryInputs.cancel()
super.shutdownHooks()
super.pumpFinished()
}
}
/**
@ -195,7 +215,7 @@ private[akka] object MultiStreamInputProcessor {
* INTERNAL API
*/
private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) {
import MultiStreamInputProcessor._
import akka.stream.impl.MultiStreamInputProcessor._
var nextId = 0
private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInputs]
@ -246,9 +266,9 @@ private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSe
super.fail(e)
}
override def shutdownHooks(): Unit = {
override def pumpFinished(): Unit = {
substreamInputs.values foreach (_.cancel())
super.shutdownHooks()
super.pumpFinished()
}
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement