=str add VirtualProcessor

- create a fully RS compliant minimal identity processor (that is not an
  Actor)
- replace SubscriberSourceVirtualProcessor,
  PublisherSinkVirtualSubscriber and PublisherSinkVirtualPublisher with
  this
- add tests that transform Sink.publisher’s and Source.subscriber’s
  materialized value
- also remove the Keep.{left, right} optimization in order to make
  side-effecting mat value transforms execute even if their values are
  discarded
This commit is contained in:
Roland Kuhn 2015-06-16 15:34:54 +02:00
parent 52f08f035a
commit 6e72271eb5
9 changed files with 182 additions and 56 deletions

View file

@ -4,7 +4,6 @@
package akka.stream.impl
import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference }
import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl.Keep
import akka.stream._
@ -12,6 +11,8 @@ import org.reactivestreams.{ Processor, Subscription, Publisher, Subscriber }
import scala.collection.mutable
import scala.util.control.NonFatal
import akka.event.Logging.simpleName
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicLong
/**
* INTERNAL API
@ -106,9 +107,8 @@ private[akka] object StreamLayout {
AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets),
downstreams ++ that.downstreams,
upstreams ++ that.upstreams,
if (f eq Keep.left) matComputation1
else if (f eq Keep.right) matComputation2
else Combine(f.asInstanceOf[(Any, Any) Any], matComputation1, matComputation2),
// would like to optimize away this allocation for Keep.{left,right} but that breaks side-effecting transformations
Combine(f.asInstanceOf[(Any, Any) Any], matComputation1, matComputation2),
attributes)
}
@ -293,38 +293,108 @@ private[akka] object StreamLayout {
}
}
private[stream] final class SubscriberSourceVirtualProcessor[T] extends Processor[T, T] {
@volatile private var subscriber: Subscriber[_ >: T] = null
private[stream] object VirtualProcessor {
sealed trait Termination
case object Allowed extends Termination
case object Completed extends Termination
case class Failed(ex: Throwable) extends Termination
override def subscribe(s: Subscriber[_ >: T]): Unit = subscriber = s
override def onError(t: Throwable): Unit = subscriber.onError(t)
override def onSubscribe(s: Subscription): Unit = subscriber.onSubscribe(s)
override def onComplete(): Unit = subscriber.onComplete()
override def onNext(t: T): Unit = subscriber.onNext(t)
private object InertSubscriber extends Subscriber[Any] {
override def onSubscribe(s: Subscription): Unit = s.cancel()
override def onNext(elem: Any): Unit = ()
override def onError(thr: Throwable): Unit = ()
override def onComplete(): Unit = ()
}
}
/**
* INTERNAL API
*/
private[stream] final class PublisherSinkVirtualSubscriber[T](val owner: PublisherSinkVirtualPublisher[T]) extends Subscriber[T] {
override def onSubscribe(s: Subscription): Unit = throw new UnsupportedOperationException("This method should not be called")
override def onError(t: Throwable): Unit = throw new UnsupportedOperationException("This method should not be called")
override def onComplete(): Unit = throw new UnsupportedOperationException("This method should not be called")
override def onNext(t: T): Unit = throw new UnsupportedOperationException("This method should not be called")
}
private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
import VirtualProcessor._
import ReactiveStreamsCompliance._
private val subscriptionStatus = new AtomicReference[AnyRef]
private val terminationStatus = new AtomicReference[Termination]
/**
* INTERNAL API
*/
private[stream] final class PublisherSinkVirtualPublisher[T]() extends Publisher[T] {
@volatile var realPublisher: Publisher[T] = null
override def subscribe(s: Subscriber[_ >: T]): Unit = {
val sub = realPublisher.subscribe(s)
// unreference the realPublisher to facilitate GC and
// Sink.publisher is supposed to reject additional subscribers anyway
realPublisher = RejectAdditionalSubscribers[T]
sub
requireNonNullSubscriber(s)
if (subscriptionStatus.compareAndSet(null, s)) () // wait for onSubscribe
else
subscriptionStatus.get match {
case sub: Subscriber[_] rejectAdditionalSubscriber(s, "VirtualProcessor")
case sub: Sub
try {
subscriptionStatus.set(s)
tryOnSubscribe(s, sub)
sub.closeLatch() // allow onNext only now
terminationStatus.getAndSet(Allowed) match {
case null // nothing happened yet
case Completed tryOnComplete(s)
case Failed(ex) tryOnError(s, ex)
case Allowed // all good
}
} catch {
case NonFatal(ex) sub.cancel()
}
}
}
override def onSubscribe(s: Subscription): Unit = {
requireNonNullSubscription(s)
val wrapped = new Sub(s)
if (subscriptionStatus.compareAndSet(null, wrapped)) () // wait for Subscriber
else
subscriptionStatus.get match {
case sub: Subscriber[_]
terminationStatus.get match {
case Allowed
/*
* There is a race condition here: if this thread reads the subscriptionStatus after
* set set() in subscribe() but then sees the terminationStatus before the getAndSet()
* is published then we will rely upon the downstream Subscriber for cancelling this
* Subscription. I only mention this because the TCK requires that we handle this here
* (since the manualSubscriber used there does not expose this behavior).
*/
s.cancel()
case _
tryOnSubscribe(sub, wrapped)
wrapped.closeLatch() // allow onNext only now
terminationStatus.set(Allowed)
}
case sub: Subscription
s.cancel() // reject further Subscriptions
}
}
override def onError(t: Throwable): Unit = {
requireNonNullException(t)
if (terminationStatus.compareAndSet(null, Failed(t))) () // let it be picked up by subscribe()
else tryOnError(subscriptionStatus.get.asInstanceOf[Subscriber[T]], t)
}
override def onComplete(): Unit =
if (terminationStatus.compareAndSet(null, Completed)) () // let it be picked up by subscribe()
else tryOnComplete(subscriptionStatus.get.asInstanceOf[Subscriber[T]])
override def onNext(t: T): Unit = {
requireNonNullElement(t)
tryOnNext(subscriptionStatus.get.asInstanceOf[Subscriber[T]], t)
}
private final class Sub(s: Subscription) extends AtomicLong with Subscription {
override def cancel(): Unit = {
subscriptionStatus.set(InertSubscriber)
s.cancel()
}
@tailrec
override def request(n: Long): Unit = {
val current = get
if (current < 0) s.request(n)
else if (compareAndSet(current, current + n)) ()
else request(n)
}
def closeLatch(): Unit = {
val requested = getAndSet(-1)
if (requested > 0) s.request(requested)
}
}
}
@ -550,6 +620,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
case mv: MaterializedValueSource[_]
val pub = new MaterializedValuePublisher
materializedValuePublishers ::= pub
materializedValues.put(mv, ())
assignPort(mv.shape.outlet, pub)
case atomic if atomic.isAtomic
materializedValues.put(atomic, materializeAtomic(atomic, subEffectiveAttributes))
@ -573,8 +644,6 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
protected def materializeAtomic(atomic: Module, effectiveAttributes: OperationAttributes): Any
protected def createIdentityProcessor: Processor[Any, Any]
private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, Any]): Any = matNode match {
case Atomic(m) materializedValues(m)
case Combine(f, d1, d2) f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
@ -582,24 +651,12 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
case Ignore ()
}
private def attach(p: Publisher[Any], s: Subscriber[Any]) = s match {
case v: PublisherSinkVirtualSubscriber[Any]
if (p.isInstanceOf[SubscriberSourceVirtualProcessor[Any]]) {
val injectedProcessor = createIdentityProcessor
v.owner.realPublisher = injectedProcessor
p.subscribe(injectedProcessor)
} else
v.owner.realPublisher = p
case _
p.subscribe(s)
}
final protected def assignPort(in: InPort, subscriber: Subscriber[Any]): Unit = {
subscribers(in) = subscriber
// Interface (unconnected) ports of the current scope will be wired when exiting the scope
if (!currentLayout.inPorts(in)) {
val publisher = publishers(currentLayout.upstreams(in))
if (publisher ne null) attach(publisher, subscriber)
if (publisher ne null) publisher.subscribe(subscriber)
}
}
@ -608,7 +665,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
// Interface (unconnected) ports of the current scope will be wired when exiting the scope
if (!currentLayout.outPorts(out)) {
val subscriber = subscribers(currentLayout.downstreams(out))
if (subscriber ne null) attach(publisher, subscriber)
if (subscriber ne null) publisher.subscribe(subscriber)
}
}