=str #16923: Inject identity between SubscriberSource and PublisherSink

This commit is contained in:
Endre Sándor Varga 2015-06-06 14:36:49 +02:00 committed by Endre Sándor Varga
parent 632868b868
commit 74843eccaf
7 changed files with 41 additions and 21 deletions

View file

@ -8,7 +8,7 @@ import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReferen
import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl.Keep
import akka.stream._
import org.reactivestreams.{ Subscription, Publisher, Subscriber }
import org.reactivestreams.{ Processor, Subscription, Publisher, Subscriber }
import scala.collection.mutable
import scala.util.control.NonFatal
import akka.event.Logging.simpleName
@ -293,10 +293,21 @@ private[akka] object StreamLayout {
}
}
private[stream] final class SubscriberSourceVirtualProcessor[T] extends Processor[T, T] {
@volatile private var subscriber: Subscriber[_ >: T] = null
override def subscribe(s: Subscriber[_ >: T]): Unit = subscriber = s
override def onError(t: Throwable): Unit = subscriber.onError(t)
override def onSubscribe(s: Subscription): Unit = subscriber.onSubscribe(s)
override def onComplete(): Unit = subscriber.onComplete()
override def onNext(t: T): Unit = subscriber.onNext(t)
}
/**
* INTERNAL API
*/
private[stream] class VirtualSubscriber[T](val owner: VirtualPublisher[T]) extends Subscriber[T] {
private[stream] final class PublisherSinkVirtualSubscriber[T](val owner: PublisherSinkVirtualPublisher[T]) extends Subscriber[T] {
override def onSubscribe(s: Subscription): Unit = throw new UnsupportedOperationException("This method should not be called")
override def onError(t: Throwable): Unit = throw new UnsupportedOperationException("This method should not be called")
override def onComplete(): Unit = throw new UnsupportedOperationException("This method should not be called")
@ -306,7 +317,7 @@ private[stream] class VirtualSubscriber[T](val owner: VirtualPublisher[T]) exten
/**
* INTERNAL API
*/
private[stream] class VirtualPublisher[T]() extends Publisher[T] {
private[stream] final class PublisherSinkVirtualPublisher[T]() extends Publisher[T] {
@volatile var realPublisher: Publisher[T] = null
override def subscribe(s: Subscriber[_ >: T]): Unit = {
val sub = realPublisher.subscribe(s)
@ -320,7 +331,7 @@ private[stream] class VirtualPublisher[T]() extends Publisher[T] {
/**
* INTERNAL API
*/
private[stream] case class MaterializedValueSource[M](
private[stream] final case class MaterializedValueSource[M](
shape: SourceShape[M] = SourceShape[M](new Outlet[M]("Materialized.out")),
attributes: OperationAttributes = OperationAttributes.name("Materialized")) extends StreamLayout.Module {
@ -562,6 +573,8 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
protected def materializeAtomic(atomic: Module, effectiveAttributes: OperationAttributes): Any
protected def createIdentityProcessor: Processor[Any, Any]
private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, Any]): Any = matNode match {
case Atomic(m) materializedValues(m)
case Combine(f, d1, d2) f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
@ -570,8 +583,15 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
}
private def attach(p: Publisher[Any], s: Subscriber[Any]) = s match {
case v: VirtualSubscriber[Any] v.owner.realPublisher = p
case _ p.subscribe(s)
case v: PublisherSinkVirtualSubscriber[Any]
if (p.isInstanceOf[SubscriberSourceVirtualProcessor[Any]]) {
val injectedProcessor = createIdentityProcessor
v.owner.realPublisher = injectedProcessor
p.subscribe(injectedProcessor)
} else
v.owner.realPublisher = p
case _
p.subscribe(s)
}
final protected def assignPort(in: InPort, subscriber: Subscriber[Any]): Unit = {