fix and document VirtualProcessor, fixes #19790

This commit is contained in:
Roland Kuhn 2016-02-24 11:55:28 +01:00
parent 93738779d3
commit 9db36727c0
6 changed files with 302 additions and 111 deletions

View file

@ -8,7 +8,6 @@ import java.{ util ⇒ ju }
import akka.NotUsed
import akka.stream.impl.MaterializerSession.MaterializationPanic
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.scaladsl.Keep
import akka.stream._
import org.reactivestreams.{ Processor, Subscription, Publisher, Subscriber }
@ -19,7 +18,6 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import akka.stream.impl.fusing.GraphStageModule
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.impl.fusing.GraphModule
/**
@ -363,12 +361,12 @@ object StreamLayout {
}
final case class CompositeModule(
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes) extends Module {
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes) extends Module {
override def replaceShape(s: Shape): Module =
if (s != shape) {
@ -395,13 +393,13 @@ object StreamLayout {
}
final case class FusedModule(
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes,
info: Fusing.StructuralInfo) extends Module {
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes,
info: Fusing.StructuralInfo) extends Module {
override def isFused: Boolean = true
@ -426,108 +424,282 @@ object StreamLayout {
}
}
/**
* INTERNAL API
*/
private[stream] object VirtualProcessor {
sealed trait Termination
case object Allowed extends Termination
case object Completed extends Termination
case class Failed(ex: Throwable) extends Termination
private val InertSubscriber = new CancellingSubscriber[Any]
case object Inert {
val subscriber = new CancellingSubscriber[Any]
}
case class Both(subscriber: Subscriber[Any])
object Both {
def create(s: Subscriber[_]) = Both(s.asInstanceOf[Subscriber[Any]])
}
}
private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
/**
* INTERNAL API
*
* This is a transparent processor that shall consume as little resources as
* possible. Due to the possibility of receiving uncoordinated inputs from both
* downstream and upstream, this needs an atomic state machine which looks a
* little like this:
*
* +--------------+ (2) +------------+
* | null | ----------> | Subscriber |
* +--------------+ +------------+
* | |
* (1) | | (1)
* \|/ \|/
* +--------------+ (2) +------------+ --\
* | Subscription | ----------> | Both | | (4)
* +--------------+ +------------+ <-/
* | |
* (3) | | (3)
* \|/ \|/
* +--------------+ (2) +------------+ --\
* | Publisher | ----------> | Inert | | (4, *)
* +--------------+ +------------+ <-/
*
* The idea is to keep the major state in only one atomic reference. The actions
* that can happen are:
*
* (1) onSubscribe
* (2) subscribe
* (3) onError / onComplete
* (4) onNext
* (*) Inert can be reached also by cancellation after which onNext is still fine
* so we just silently ignore possible spec violations here
*
* Any event that occurs in a state where no matching outgoing arrow can be found
* is a spec violation, leading to the shutdown of this processor (meaning that
* the state is updated such that all following actions match that of a failed
* Publisher or a cancelling Subscriber, and the non-guilty party is informed if
* already connected).
*
* request() can only be called after the Subscriber has received the Subscription
* and that also means that onNext() will only happen after having transitioned into
* the Both state as well. The Publisher state means that if the real
* Publisher terminates before we get the Subscriber, we can just forget about the
* real one and keep an already finished one around for the Subscriber.
*
* The Subscription that is offered to the Subscriber must cancel the original
* Publisher if things go wrong (like `request(0)` coming in from downstream) and
* it must ensure that we drop the Subscriber reference when `cancel` is invoked.
*/
private[stream] final class VirtualProcessor[T] extends AtomicReference[AnyRef] with Processor[T, T] {
import VirtualProcessor._
import ReactiveStreamsCompliance._
private val subscriptionStatus = new AtomicReference[AnyRef]
private val terminationStatus = new AtomicReference[Termination]
override def subscribe(s: Subscriber[_ >: T]): Unit = {
requireNonNullSubscriber(s)
if (subscriptionStatus.compareAndSet(null, s)) () // wait for onSubscribe
else
subscriptionStatus.get match {
case sub: Subscriber[_] rejectAdditionalSubscriber(s, "VirtualProcessor")
case sub: Sub
try {
subscriptionStatus.set(s)
tryOnSubscribe(s, sub)
sub.closeLatch() // allow onNext only now
terminationStatus.getAndSet(Allowed) match {
case null // nothing happened yet
case VirtualProcessor.Completed tryOnComplete(s)
case VirtualProcessor.Failed(ex) tryOnError(s, ex)
case VirtualProcessor.Allowed // all good
}
} catch {
case NonFatal(ex) sub.cancel()
}
@tailrec def rec(sub: Subscriber[Any]): Unit =
get() match {
case null => if (!compareAndSet(null, s)) rec(sub)
case subscription: Subscription =>
if (compareAndSet(subscription, Both(sub))) establishSubscription(sub, subscription)
else rec(sub)
case pub: Publisher[_] =>
if (compareAndSet(pub, Inert)) pub.subscribe(sub)
else rec(sub)
case _ =>
rejectAdditionalSubscriber(sub, "VirtualProcessor")
}
if (s == null) {
val ex = subscriberMustNotBeNullException
try rec(Inert.subscriber)
finally throw ex // must throw NPE, rule 2:13
} else rec(s.asInstanceOf[Subscriber[Any]])
}
override def onSubscribe(s: Subscription): Unit = {
requireNonNullSubscription(s)
val wrapped = new Sub(s)
if (subscriptionStatus.compareAndSet(null, wrapped)) () // wait for Subscriber
else
subscriptionStatus.get match {
case sub: Subscriber[_]
terminationStatus.get match {
case Allowed
/*
* There is a race condition here: if this thread reads the subscriptionStatus after
* set set() in subscribe() but then sees the terminationStatus before the getAndSet()
* is published then we will rely upon the downstream Subscriber for cancelling this
* Subscription. I only mention this because the TCK requires that we handle this here
* (since the manualSubscriber used there does not expose this behavior).
*/
s.cancel()
case _
tryOnSubscribe(sub, wrapped)
wrapped.closeLatch() // allow onNext only now
terminationStatus.set(Allowed)
override final def onSubscribe(s: Subscription): Unit = {
@tailrec def rec(obj: AnyRef): Unit =
get() match {
case null => if (!compareAndSet(null, obj)) rec(obj)
case subscriber: Subscriber[_] =>
obj match {
case subscription: Subscription =>
if (compareAndSet(subscriber, Both.create(subscriber))) establishSubscription(subscriber, subscription)
else rec(obj)
case pub: Publisher[_] =>
getAndSet(Inert) match {
case Inert => // nothing to be done
case _ => pub.subscribe(subscriber.asInstanceOf[Subscriber[Any]])
}
}
case sub: Subscription
s.cancel() // reject further Subscriptions
case _ =>
// spec violation
tryCancel(s)
}
if (s == null) {
val ex = subscriptionMustNotBeNullException
try rec(ErrorPublisher(ex, "failed-VirtualProcessor"))
finally throw ex // must throw NPE, rule 2:13
} else rec(s)
}
private def establishSubscription(subscriber: Subscriber[_], subscription: Subscription): Unit = {
val wrapped = new WrappedSubscription(subscription)
try subscriber.onSubscribe(wrapped)
catch {
case NonFatal(ex) =>
set(Inert)
tryCancel(subscription)
tryOnError(subscriber, ex)
}
}
override def onError(t: Throwable): Unit = {
requireNonNullException(t)
if (terminationStatus.compareAndSet(null, Failed(t))) () // let it be picked up by subscribe()
else tryOnError(subscriptionStatus.get.asInstanceOf[Subscriber[T]], t)
/*
* `ex` is always a reasonable Throwable that we should communicate downstream,
* but if `t` was `null` then the spec requires us to throw an NPE (which `ex`
* will be in this case).
*/
@tailrec def rec(ex: Throwable): Unit =
get() match {
case null =>
if (!compareAndSet(null, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex)
else if (t == null) throw ex
case s: Subscription =>
if (!compareAndSet(s, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex)
else if (t == null) throw ex
case Both(s) =>
set(Inert)
try tryOnError(s, ex)
finally if (t == null) throw ex // must throw NPE, rule 2:13
case s: Subscriber[_] => // spec violation
getAndSet(Inert) match {
case Inert => // nothing to be done
case _ => ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s)
}
case _ => // spec violation or cancellation race, but nothing we can do
}
val ex = if (t == null) exceptionMustNotBeNullException else t
rec(ex)
}
override def onComplete(): Unit =
if (terminationStatus.compareAndSet(null, Completed)) () // let it be picked up by subscribe()
else tryOnComplete(subscriptionStatus.get.asInstanceOf[Subscriber[T]])
override def onNext(t: T): Unit = {
requireNonNullElement(t)
tryOnNext(subscriptionStatus.get.asInstanceOf[Subscriber[T]], t)
}
private final class Sub(s: Subscription) extends AtomicLong with Subscription {
override def cancel(): Unit = {
subscriptionStatus.set(InertSubscriber)
s.cancel()
@tailrec override final def onComplete(): Unit =
get() match {
case null => if (!compareAndSet(null, EmptyPublisher)) onComplete()
case s: Subscription => if (!compareAndSet(s, EmptyPublisher)) onComplete()
case Both(s) =>
set(Inert)
tryOnComplete(s)
case s: Subscriber[_] => // spec violation
set(Inert)
EmptyPublisher.subscribe(s)
case _ => // spec violation or cancellation race, but nothing we can do
}
@tailrec
override def onNext(t: T): Unit =
if (t == null) {
val ex = elementMustNotBeNullException
@tailrec def rec(): Unit =
get() match {
case x @ (null | _: Subscription) => if (!compareAndSet(x, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec()
case s: Subscriber[_] => try s.onError(ex) catch { case NonFatal(_) => } finally set(Inert)
case Both(s) => try s.onError(ex) catch { case NonFatal(_) => } finally set(Inert)
case _ => // spec violation or cancellation race, but nothing we can do
}
rec()
throw ex // must throw NPE, rule 2:13
} else {
@tailrec def rec(): Unit =
get() match {
case Both(s) =>
try s.onNext(t)
catch {
case NonFatal(e) =>
set(Inert)
throw new IllegalStateException("Subscriber threw exception, this is in violation of rule 2:13", e)
}
case s: Subscriber[_] => // spec violation
val ex = new IllegalStateException(noDemand)
getAndSet(Inert) match {
case Inert => // nothing to be done
case _ => ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s)
}
throw ex
case Inert | _: Publisher[_] => // nothing to be done
case other =>
val pub = ErrorPublisher(new IllegalStateException(noDemand), "failed-VirtualPublisher")
if (!compareAndSet(other, pub)) rec()
else throw pub.t
}
rec()
}
private def noDemand = "spec violation: onNext was signaled from upstream without demand"
private class WrappedSubscription(real: Subscription) extends Subscription {
override def request(n: Long): Unit = {
val current = get
if (current < 0) s.request(n)
else if (compareAndSet(current, current + n)) ()
else request(n)
if (n < 1) {
tryCancel(real)
getAndSet(Inert) match {
case Both(s) => rejectDueToNonPositiveDemand(s)
case Inert => // another failure has won the race
case _ => // this cannot possibly happen, but signaling errors is impossible at this point
}
} else real.request(n)
}
def closeLatch(): Unit = {
val requested = getAndSet(-1)
if (requested > 0) s.request(requested)
override def cancel(): Unit = {
set(Inert)
real.cancel()
}
}
}
/**
* INTERNAL API
*
* The implementation of `Sink.asPublisher` needs to offer a `Publisher` that
* defers to the upstream that is connected during materialization. This would
* be trivial if it were not for materialized value computations that may even
* spawn the code that does `pub.subscribe(sub)` in a Future, running concurrently
* with the actual materialization. Therefore we implement a minimial shell here
* that plugs the downstream and the upstream together as soon as both are known.
* Using a VirtualProcessor would technically also work, but it would defeat the
* purpose of subscription timeoutsthe subscription would always already be
* established from the Actors perspective, regardless of whether a downstream
* will ever be connected.
*
* One important consideration is that this `Publisher` must not retain a reference
* to the `Subscriber` after having hooked it up with the real `Publisher`, hence
* the use of `Inert.subscriber` as a tombstone.
*/
private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Publisher[T] {
import VirtualProcessor.Inert
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: T]): Unit = {
requireNonNullSubscriber(subscriber)
@tailrec def rec(): Unit = {
get() match {
case null => if (!compareAndSet(null, subscriber)) rec()
case pub: Publisher[_] =>
if (compareAndSet(pub, Inert.subscriber)) {
pub.asInstanceOf[Publisher[T]].subscribe(subscriber)
} else rec()
case _: Subscriber[_] => rejectAdditionalSubscriber(subscriber, "Sink.asPublisher(fanout = false)")
}
}
rec() // return value is boolean only to make the expressions above compile
}
@tailrec final def registerPublisher(pub: Publisher[_]): Unit =
get() match {
case null => if (!compareAndSet(null, pub)) registerPublisher(pub)
case sub: Subscriber[r] =>
set(Inert.subscriber)
pub.asInstanceOf[Publisher[r]].subscribe(sub)
case _ => throw new IllegalStateException("internal error")
}
}
/**
* INERNAL API
*/
private[stream] object MaterializerSession {
class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace
@ -541,8 +713,9 @@ private[stream] object MaterializerSession {
private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initialAttributes: Attributes) {
import StreamLayout._
private var subscribersStack: List[ju.Map[InPort, Subscriber[Any]]] =
new ju.HashMap[InPort, Subscriber[Any]] :: Nil
// the contained maps store either Subscriber[Any] or VirtualPublisher, but the type system cannot express that
private var subscribersStack: List[ju.Map[InPort, AnyRef]] =
new ju.HashMap[InPort, AnyRef] :: Nil
private var publishersStack: List[ju.Map[OutPort, Publisher[Any]]] =
new ju.HashMap[OutPort, Publisher[Any]] :: Nil
@ -556,7 +729,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
*/
private var moduleStack: List[Module] = topLevel :: Nil
private def subscribers: ju.Map[InPort, Subscriber[Any]] = subscribersStack.head
private def subscribers: ju.Map[InPort, AnyRef] = subscribersStack.head
private def publishers: ju.Map[OutPort, Publisher[Any]] = publishersStack.head
private def currentLayout: Module = moduleStack.head
@ -604,7 +777,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
// (This is an attempt to clean up after an exception during materialization)
val errorPublisher = new ErrorPublisher(new MaterializationPanic(cause), "")
for (subMap subscribersStack; sub subMap.asScala.valuesIterator)
errorPublisher.subscribe(sub)
doSubscribe(errorPublisher, sub)
for (pubMap publishersStack; pub pubMap.asScala.valuesIterator)
pub.subscribe(new CancellingSubscriber)
@ -680,12 +853,12 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
ret
}
final protected def assignPort(in: InPort, subscriber: Subscriber[Any]): Unit = {
subscribers.put(in, subscriber)
final protected def assignPort(in: InPort, subscriberOrVirtual: AnyRef): Unit = {
subscribers.put(in, subscriberOrVirtual)
// Interface (unconnected) ports of the current scope will be wired when exiting the scope
if (!currentLayout.inPorts(in)) {
val publisher = publishers.get(currentLayout.upstreams(in))
if (publisher ne null) publisher.subscribe(subscriber)
if (publisher ne null) doSubscribe(publisher, subscriberOrVirtual)
}
}
@ -694,8 +867,14 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
// Interface (unconnected) ports of the current scope will be wired when exiting the scope
if (!currentLayout.outPorts(out)) {
val subscriber = subscribers.get(currentLayout.downstreams(out))
if (subscriber ne null) publisher.subscribe(subscriber)
if (subscriber ne null) doSubscribe(publisher, subscriber)
}
}
private def doSubscribe(publisher: Publisher[_ <: Any], subscriberOrVirtual: AnyRef): Unit =
subscriberOrVirtual match {
case s: Subscriber[_] => publisher.subscribe(s.asInstanceOf[Subscriber[Any]])
case v: VirtualPublisher[_] => v.registerPublisher(publisher)
}
}