2014-03-30 09:27:19 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.stream.impl
|
|
|
|
|
|
2014-04-08 13:37:55 +02:00
|
|
|
import akka.stream.MaterializerSettings
|
2014-05-12 16:45:30 +02:00
|
|
|
import akka.actor.{ Actor, Terminated, ActorRef }
|
2014-03-30 09:27:19 +02:00
|
|
|
import org.reactivestreams.spi.{ Subscriber, Subscription }
|
|
|
|
|
import org.reactivestreams.api.Producer
|
2014-05-16 17:08:44 +02:00
|
|
|
import akka.stream.actor.ActorConsumer.{ OnNext, OnError, OnComplete, OnSubscribe }
|
2014-03-30 09:27:19 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object MultiStreamOutputProcessor {
|
|
|
|
|
case class SubstreamRequestMore(substream: ActorRef, demand: Int)
|
|
|
|
|
case class SubstreamCancel(substream: ActorRef)
|
|
|
|
|
|
2014-04-01 12:45:41 +02:00
|
|
|
class SubstreamSubscription(val parent: ActorRef, val substream: ActorRef) extends Subscription {
|
2014-03-30 09:27:19 +02:00
|
|
|
override def requestMore(elements: Int): Unit =
|
|
|
|
|
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0")
|
|
|
|
|
else parent ! SubstreamRequestMore(substream, elements)
|
|
|
|
|
override def cancel(): Unit = parent ! SubstreamCancel(substream)
|
|
|
|
|
override def toString = "SubstreamSubscription" + System.identityHashCode(this)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2014-04-08 13:37:55 +02:00
|
|
|
private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) {
|
2014-03-30 09:27:19 +02:00
|
|
|
import MultiStreamOutputProcessor._
|
|
|
|
|
|
|
|
|
|
private val substreamOutputs = collection.mutable.Map.empty[ActorRef, SubstreamOutputs]
|
|
|
|
|
|
|
|
|
|
class SubstreamOutputs extends Outputs {
|
|
|
|
|
private var completed: Boolean = false
|
|
|
|
|
private var demands: Int = 0
|
|
|
|
|
|
2014-05-16 14:21:15 +02:00
|
|
|
override def subreceive: SubReceive =
|
2014-05-12 16:45:30 +02:00
|
|
|
throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block")
|
|
|
|
|
|
|
|
|
|
val substream = context.watch(context.actorOf(
|
|
|
|
|
IdentityProcessorImpl.props(settings)
|
|
|
|
|
.withDispatcher(context.props.dispatcher)))
|
2014-03-30 09:27:19 +02:00
|
|
|
val processor = new ActorProcessor[AnyRef, AnyRef](substream)
|
|
|
|
|
|
2014-04-01 12:45:41 +02:00
|
|
|
override def isClosed: Boolean = completed
|
2014-03-30 09:27:19 +02:00
|
|
|
override def complete(): Unit = {
|
|
|
|
|
if (!completed) substream ! OnComplete
|
|
|
|
|
completed = true
|
|
|
|
|
}
|
|
|
|
|
|
2014-04-08 10:28:27 +02:00
|
|
|
override def cancel(e: Throwable): Unit = {
|
|
|
|
|
if (!completed) substream ! OnError(e)
|
|
|
|
|
completed = true
|
|
|
|
|
}
|
2014-03-30 09:27:19 +02:00
|
|
|
|
|
|
|
|
override def enqueueOutputElement(elem: Any): Unit = {
|
|
|
|
|
demands -= 1
|
|
|
|
|
substream ! OnNext(elem)
|
|
|
|
|
}
|
2014-04-01 12:45:41 +02:00
|
|
|
|
2014-03-30 09:27:19 +02:00
|
|
|
def enqueueOutputDemand(demand: Int): Unit = demands += demand
|
|
|
|
|
override def demandAvailable: Boolean = demands > 0
|
|
|
|
|
override val NeedsDemand: TransferState = new TransferState {
|
|
|
|
|
override def isReady: Boolean = demandAvailable
|
|
|
|
|
override def isCompleted: Boolean = completed
|
|
|
|
|
}
|
|
|
|
|
override val NeedsDemandOrCancel: TransferState = new TransferState {
|
2014-04-01 12:45:41 +02:00
|
|
|
override def isReady: Boolean = demandAvailable || isClosed
|
2014-03-30 09:27:19 +02:00
|
|
|
override def isCompleted: Boolean = false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected def newSubstream(): SubstreamOutputs = {
|
|
|
|
|
val outputs = new SubstreamOutputs
|
|
|
|
|
outputs.substream ! OnSubscribe(new SubstreamSubscription(self, outputs.substream))
|
|
|
|
|
substreamOutputs(outputs.substream) = outputs
|
|
|
|
|
outputs
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-12 16:45:30 +02:00
|
|
|
def fullyCompleted: Boolean = primaryOutputsShutdown && isPumpFinished && context.children.isEmpty
|
2014-04-08 10:28:27 +02:00
|
|
|
|
2014-03-30 09:27:19 +02:00
|
|
|
protected def invalidateSubstream(substream: ActorRef): Unit = {
|
|
|
|
|
substreamOutputs(substream).complete()
|
|
|
|
|
substreamOutputs -= substream
|
2014-05-12 16:45:30 +02:00
|
|
|
shutdownHooks()
|
2014-03-30 09:27:19 +02:00
|
|
|
pump()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def fail(e: Throwable): Unit = {
|
2014-04-08 10:28:27 +02:00
|
|
|
substreamOutputs.values foreach (_.cancel(e))
|
2014-03-30 09:27:19 +02:00
|
|
|
super.fail(e)
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-12 16:45:30 +02:00
|
|
|
// FIXME: proper shutdown scheduling
|
|
|
|
|
override def shutdownHooks(): Unit = if (fullyCompleted) super.shutdownHooks()
|
2014-03-30 09:27:19 +02:00
|
|
|
|
2014-04-08 10:28:27 +02:00
|
|
|
override def pumpFinished(): Unit = {
|
2014-03-30 09:27:19 +02:00
|
|
|
context.children foreach (_ ! OnComplete)
|
2014-04-08 10:28:27 +02:00
|
|
|
super.pumpFinished()
|
2014-03-30 09:27:19 +02:00
|
|
|
}
|
|
|
|
|
|
2014-05-12 16:45:30 +02:00
|
|
|
val substreamManagement: Receive = {
|
2014-03-30 09:27:19 +02:00
|
|
|
case SubstreamRequestMore(key, demand) ⇒
|
|
|
|
|
substreamOutputs(key).enqueueOutputDemand(demand)
|
|
|
|
|
pump()
|
|
|
|
|
case SubstreamCancel(key) ⇒ // FIXME: Terminated should handle this case. Maybe remove SubstreamCancel and just Poison self?
|
|
|
|
|
case Terminated(child) ⇒ invalidateSubstream(child)
|
|
|
|
|
|
|
|
|
|
}
|
2014-05-12 16:45:30 +02:00
|
|
|
|
2014-05-16 14:21:15 +02:00
|
|
|
override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
|
2014-03-30 09:27:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object TwoStreamInputProcessor {
|
2014-04-01 12:45:41 +02:00
|
|
|
class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] {
|
2014-06-10 14:09:08 +02:00
|
|
|
override def onError(cause: Throwable): Unit = impl ! OtherStreamOnError(cause)
|
2014-03-30 09:27:19 +02:00
|
|
|
override def onComplete(): Unit = impl ! OtherStreamOnComplete
|
|
|
|
|
override def onNext(element: T): Unit = impl ! OtherStreamOnNext(element)
|
|
|
|
|
override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case object OtherStreamOnComplete
|
|
|
|
|
case class OtherStreamOnNext(element: Any)
|
|
|
|
|
case class OtherStreamOnSubscribe(subscription: Subscription)
|
2014-06-10 14:09:08 +02:00
|
|
|
case class OtherStreamOnError(ex: Throwable)
|
2014-03-30 09:27:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2014-04-08 13:37:55 +02:00
|
|
|
private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSettings, val other: Producer[Any])
|
2014-03-30 09:27:19 +02:00
|
|
|
extends ActorProcessorImpl(_settings) {
|
|
|
|
|
import TwoStreamInputProcessor._
|
|
|
|
|
|
2014-05-12 16:45:30 +02:00
|
|
|
val secondaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) {
|
|
|
|
|
override val subreceive: SubReceive = new SubReceive(waitingForUpstream)
|
2014-03-30 09:27:19 +02:00
|
|
|
|
2014-05-12 16:45:30 +02:00
|
|
|
override def inputOnError(e: Throwable): Unit = TwoStreamInputProcessor.this.onError(e)
|
2014-03-30 09:27:19 +02:00
|
|
|
|
2014-05-12 16:45:30 +02:00
|
|
|
override def waitingForUpstream: Receive = {
|
|
|
|
|
case OtherStreamOnComplete ⇒ onComplete()
|
|
|
|
|
case OtherStreamOnSubscribe(subscription) ⇒ onSubscribe(subscription)
|
2014-06-10 14:09:08 +02:00
|
|
|
case OtherStreamOnError(e) ⇒ TwoStreamInputProcessor.this.onError(e)
|
2014-05-12 16:45:30 +02:00
|
|
|
}
|
2014-03-30 09:27:19 +02:00
|
|
|
|
2014-05-12 16:45:30 +02:00
|
|
|
override def upstreamRunning: Receive = {
|
|
|
|
|
case OtherStreamOnNext(element) ⇒ enqueueInputElement(element)
|
|
|
|
|
case OtherStreamOnComplete ⇒ onComplete()
|
2014-06-10 14:09:08 +02:00
|
|
|
case OtherStreamOnError(e) ⇒ TwoStreamInputProcessor.this.onError(e)
|
2014-05-12 16:45:30 +02:00
|
|
|
}
|
|
|
|
|
override protected def completed: Actor.Receive = {
|
|
|
|
|
case OtherStreamOnSubscribe(_) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber")
|
|
|
|
|
}
|
2014-03-30 09:27:19 +02:00
|
|
|
}
|
|
|
|
|
|
2014-05-16 14:21:15 +02:00
|
|
|
override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive
|
2014-03-30 09:27:19 +02:00
|
|
|
|
2014-05-12 16:45:30 +02:00
|
|
|
other.getPublisher.subscribe(new OtherActorSubscriber(self))
|
2014-03-30 09:27:19 +02:00
|
|
|
|
2014-05-12 16:45:30 +02:00
|
|
|
override def shutdownHooks(): Unit = {
|
|
|
|
|
secondaryInputs.cancel()
|
|
|
|
|
super.shutdownHooks()
|
2014-03-30 09:27:19 +02:00
|
|
|
}
|
2014-05-16 14:21:15 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object MultiStreamInputProcessor {
|
|
|
|
|
case class SubstreamKey(id: Int)
|
|
|
|
|
|
|
|
|
|
class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends Subscriber[T] {
|
|
|
|
|
override def onError(cause: Throwable): Unit = impl ! SubstreamOnError(key, cause)
|
|
|
|
|
override def onComplete(): Unit = impl ! SubstreamOnComplete(key)
|
|
|
|
|
override def onNext(element: T): Unit = impl ! SubstreamOnNext(key, element)
|
|
|
|
|
override def onSubscribe(subscription: Subscription): Unit = impl ! SubstreamStreamOnSubscribe(key, subscription)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case class SubstreamOnComplete(key: SubstreamKey)
|
|
|
|
|
case class SubstreamOnNext(key: SubstreamKey, element: Any)
|
|
|
|
|
case class SubstreamOnError(key: SubstreamKey, e: Throwable)
|
|
|
|
|
case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) {
|
|
|
|
|
import MultiStreamInputProcessor._
|
|
|
|
|
var nextId = 0
|
|
|
|
|
|
|
|
|
|
private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInputs]
|
|
|
|
|
|
|
|
|
|
class SubstreamInputs(val key: SubstreamKey) extends BatchingInputBuffer(settings.initialInputBufferSize, pump = this) {
|
|
|
|
|
// Not driven directly
|
|
|
|
|
override val subreceive = new SubReceive(Actor.emptyBehavior)
|
|
|
|
|
|
|
|
|
|
def substreamOnComplete(): Unit = onComplete()
|
|
|
|
|
def substreamOnSubscribe(subscription: Subscription): Unit = onSubscribe(subscription)
|
|
|
|
|
def substreamOnError(e: Throwable): Unit = onError(e)
|
|
|
|
|
def substreamOnNext(elem: Any): Unit = enqueueInputElement(elem)
|
|
|
|
|
|
|
|
|
|
override protected def inputOnError(e: Throwable): Unit = {
|
|
|
|
|
super.inputOnError(e)
|
|
|
|
|
invalidateSubstream(key, e)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val substreamManagement: Receive = {
|
|
|
|
|
case SubstreamStreamOnSubscribe(key, subscription) ⇒ substreamInputs(key).substreamOnSubscribe(subscription)
|
|
|
|
|
case SubstreamOnNext(key, element) ⇒ substreamInputs(key).substreamOnNext(element)
|
|
|
|
|
case SubstreamOnComplete(key) ⇒ {
|
|
|
|
|
substreamInputs(key).substreamOnComplete()
|
|
|
|
|
substreamInputs -= key
|
|
|
|
|
}
|
|
|
|
|
case SubstreamOnError(key, e) ⇒ substreamInputs(key).substreamOnError(e)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def createSubstreamInputs(p: Producer[Any]): SubstreamInputs = {
|
|
|
|
|
val key = SubstreamKey(nextId)
|
|
|
|
|
val inputs = new SubstreamInputs(key)
|
|
|
|
|
p.getPublisher.subscribe(new SubstreamSubscriber(self, key))
|
|
|
|
|
substreamInputs(key) = inputs
|
|
|
|
|
nextId += 1
|
|
|
|
|
inputs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = {
|
|
|
|
|
substreamInputs(substream).cancel()
|
|
|
|
|
substreamInputs -= substream
|
|
|
|
|
pump()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def fail(e: Throwable): Unit = {
|
|
|
|
|
substreamInputs.values foreach (_.cancel())
|
|
|
|
|
super.fail(e)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def shutdownHooks(): Unit = {
|
|
|
|
|
substreamInputs.values foreach (_.cancel())
|
|
|
|
|
super.shutdownHooks()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
|
2014-03-30 09:27:19 +02:00
|
|
|
}
|