diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala index b624ef3f5a..66c983f498 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala @@ -79,13 +79,13 @@ private object RenderSupport { override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = { sent += elem.length if (sent > length) - throw InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to more bytes") + ctx fail InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to more bytes") ctx.push(elem) } override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = { if (sent < length) - throw InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to ${length - sent} bytes less") + ctx fail InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to ${length - sent} bytes less") ctx.finish() } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index cf214513ae..f671d5bdd6 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -5,7 +5,6 @@ package akka.http.impl.util import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } - import akka.NotUsed import akka.http.scaladsl.model.RequestEntity import akka.stream._ @@ -16,7 +15,6 @@ import akka.stream.scaladsl._ import akka.stream.stage._ import akka.util.ByteString import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } - import scala.concurrent.{ ExecutionContext, Future, Promise } /** @@ -179,8 +177,8 @@ private[http] object StreamUtils { /** A copy of PublisherSink that allows access to the publisher through the cell but can only materialized once */ private class OneTimePublisherSink[In](attributes: Attributes, shape: SinkShape[In], cell: OneTimeWriteCell[Publisher[In]]) - extends PublisherSink[In](attributes, shape) { - override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = { + extends PublisherSink[In](attributes, shape) { + override def create(context: MaterializationContext): (AnyRef, Publisher[In]) = { val results = super.create(context) cell.set(results._2) results @@ -193,7 +191,7 @@ private[http] object StreamUtils { } /** A copy of SubscriberSource that allows access to the subscriber through the cell but can only materialized once */ private class OneTimeSubscriberSource[Out](val attributes: Attributes, shape: SourceShape[Out], cell: OneTimeWriteCell[Subscriber[Out]]) - extends SourceModule[Out, Subscriber[Out]](shape) { + extends SourceModule[Out, Subscriber[Out]](shape) { override def create(context: MaterializationContext): (Publisher[Out], Subscriber[Out]) = { val processor = new Processor[Out, Out] { diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala index e507d180e7..7c0bb13899 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala @@ -742,7 +742,9 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { """ pushInput(header) - expectProtocolErrorOnNetwork() + EventFilter[ProtocolException](occurrences = 1).intercept { + expectProtocolErrorOnNetwork() + } } "control frame bigger than 125 bytes" in new ServerTestSetup { pushInput(frameHeader(Opcode.Ping, 126, fin = true, mask = Some(0))) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 0ce74e67d3..82f20a16ff 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -104,7 +104,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, atomic match { case sink: SinkModule[_, _] ⇒ val (sub, mat) = sink.create(newMaterializationContext()) - assignPort(sink.shape.in, sub.asInstanceOf[Subscriber[Any]]) + assignPort(sink.shape.in, sub) matVal.put(atomic, mat) case source: SourceModule[_, _] ⇒ val (pub, mat) = source.create(newMaterializationContext()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index f7fdac95d7..b1e0b5a32e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -26,7 +26,14 @@ import java.util.Optional */ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module { - def create(context: MaterializationContext): (Subscriber[In] @uncheckedVariance, Mat) + /** + * Create the Subscriber or VirtualPublisher that consumes the incoming + * stream, plus the materialized value. Since Subscriber and VirtualPublisher + * do not share a common supertype apart from AnyRef this is what the type + * union devolves into; unfortunately we do not have union types at our + * disposal at this point. + */ + def create(context: MaterializationContext): (AnyRef, Mat) override def replaceShape(s: Shape): Module = if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Sink, you need to wrap it in a Graph for that") @@ -59,8 +66,13 @@ private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkSha override def toString: String = "PublisherSink" - override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = { - val proc = new VirtualProcessor[In] + /* + * This method is the reason why SinkModule.create may return something that is + * not a Subscriber: a VirtualPublisher is used in order to avoid the immediate + * subscription a VirtualProcessor would perform (and it also saves overhead). + */ + override def create(context: MaterializationContext): (AnyRef, Publisher[In]) = { + val proc = new VirtualPublisher[In] (proc, proc) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index aa5203b503..4510fcc105 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -8,7 +8,6 @@ import java.{ util ⇒ ju } import akka.NotUsed import akka.stream.impl.MaterializerSession.MaterializationPanic import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.fusing.GraphStages.MaterializedValueSource import akka.stream.scaladsl.Keep import akka.stream._ import org.reactivestreams.{ Processor, Subscription, Publisher, Subscriber } @@ -19,7 +18,6 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import akka.stream.impl.fusing.GraphStageModule import akka.stream.impl.fusing.GraphStages.MaterializedValueSource -import akka.stream.impl.fusing.GraphStages.MaterializedValueSource import akka.stream.impl.fusing.GraphModule /** @@ -363,12 +361,12 @@ object StreamLayout { } final case class CompositeModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes) extends Module { override def replaceShape(s: Shape): Module = if (s != shape) { @@ -395,13 +393,13 @@ object StreamLayout { } final case class FusedModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes, - info: Fusing.StructuralInfo) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes, + info: Fusing.StructuralInfo) extends Module { override def isFused: Boolean = true @@ -426,108 +424,282 @@ object StreamLayout { } } +/** + * INTERNAL API + */ private[stream] object VirtualProcessor { - sealed trait Termination - case object Allowed extends Termination - case object Completed extends Termination - case class Failed(ex: Throwable) extends Termination - - private val InertSubscriber = new CancellingSubscriber[Any] + case object Inert { + val subscriber = new CancellingSubscriber[Any] + } + case class Both(subscriber: Subscriber[Any]) + object Both { + def create(s: Subscriber[_]) = Both(s.asInstanceOf[Subscriber[Any]]) + } } -private[stream] final class VirtualProcessor[T] extends Processor[T, T] { +/** + * INTERNAL API + * + * This is a transparent processor that shall consume as little resources as + * possible. Due to the possibility of receiving uncoordinated inputs from both + * downstream and upstream, this needs an atomic state machine which looks a + * little like this: + * + * +--------------+ (2) +------------+ + * | null | ----------> | Subscriber | + * +--------------+ +------------+ + * | | + * (1) | | (1) + * \|/ \|/ + * +--------------+ (2) +------------+ --\ + * | Subscription | ----------> | Both | | (4) + * +--------------+ +------------+ <-/ + * | | + * (3) | | (3) + * \|/ \|/ + * +--------------+ (2) +------------+ --\ + * | Publisher | ----------> | Inert | | (4, *) + * +--------------+ +------------+ <-/ + * + * The idea is to keep the major state in only one atomic reference. The actions + * that can happen are: + * + * (1) onSubscribe + * (2) subscribe + * (3) onError / onComplete + * (4) onNext + * (*) Inert can be reached also by cancellation after which onNext is still fine + * so we just silently ignore possible spec violations here + * + * Any event that occurs in a state where no matching outgoing arrow can be found + * is a spec violation, leading to the shutdown of this processor (meaning that + * the state is updated such that all following actions match that of a failed + * Publisher or a cancelling Subscriber, and the non-guilty party is informed if + * already connected). + * + * request() can only be called after the Subscriber has received the Subscription + * and that also means that onNext() will only happen after having transitioned into + * the Both state as well. The Publisher state means that if the real + * Publisher terminates before we get the Subscriber, we can just forget about the + * real one and keep an already finished one around for the Subscriber. + * + * The Subscription that is offered to the Subscriber must cancel the original + * Publisher if things go wrong (like `request(0)` coming in from downstream) and + * it must ensure that we drop the Subscriber reference when `cancel` is invoked. + */ +private[stream] final class VirtualProcessor[T] extends AtomicReference[AnyRef] with Processor[T, T] { import VirtualProcessor._ import ReactiveStreamsCompliance._ - private val subscriptionStatus = new AtomicReference[AnyRef] - private val terminationStatus = new AtomicReference[Termination] - override def subscribe(s: Subscriber[_ >: T]): Unit = { - requireNonNullSubscriber(s) - if (subscriptionStatus.compareAndSet(null, s)) () // wait for onSubscribe - else - subscriptionStatus.get match { - case sub: Subscriber[_] ⇒ rejectAdditionalSubscriber(s, "VirtualProcessor") - case sub: Sub ⇒ - try { - subscriptionStatus.set(s) - tryOnSubscribe(s, sub) - sub.closeLatch() // allow onNext only now - terminationStatus.getAndSet(Allowed) match { - case null ⇒ // nothing happened yet - case VirtualProcessor.Completed ⇒ tryOnComplete(s) - case VirtualProcessor.Failed(ex) ⇒ tryOnError(s, ex) - case VirtualProcessor.Allowed ⇒ // all good - } - } catch { - case NonFatal(ex) ⇒ sub.cancel() - } + @tailrec def rec(sub: Subscriber[Any]): Unit = + get() match { + case null => if (!compareAndSet(null, s)) rec(sub) + case subscription: Subscription => + if (compareAndSet(subscription, Both(sub))) establishSubscription(sub, subscription) + else rec(sub) + case pub: Publisher[_] => + if (compareAndSet(pub, Inert)) pub.subscribe(sub) + else rec(sub) + case _ => + rejectAdditionalSubscriber(sub, "VirtualProcessor") } + + if (s == null) { + val ex = subscriberMustNotBeNullException + try rec(Inert.subscriber) + finally throw ex // must throw NPE, rule 2:13 + } else rec(s.asInstanceOf[Subscriber[Any]]) } - override def onSubscribe(s: Subscription): Unit = { - requireNonNullSubscription(s) - val wrapped = new Sub(s) - if (subscriptionStatus.compareAndSet(null, wrapped)) () // wait for Subscriber - else - subscriptionStatus.get match { - case sub: Subscriber[_] ⇒ - terminationStatus.get match { - case Allowed ⇒ - /* - * There is a race condition here: if this thread reads the subscriptionStatus after - * set set() in subscribe() but then sees the terminationStatus before the getAndSet() - * is published then we will rely upon the downstream Subscriber for cancelling this - * Subscription. I only mention this because the TCK requires that we handle this here - * (since the manualSubscriber used there does not expose this behavior). - */ - s.cancel() - case _ ⇒ - tryOnSubscribe(sub, wrapped) - wrapped.closeLatch() // allow onNext only now - terminationStatus.set(Allowed) + override final def onSubscribe(s: Subscription): Unit = { + @tailrec def rec(obj: AnyRef): Unit = + get() match { + case null => if (!compareAndSet(null, obj)) rec(obj) + case subscriber: Subscriber[_] => + obj match { + case subscription: Subscription => + if (compareAndSet(subscriber, Both.create(subscriber))) establishSubscription(subscriber, subscription) + else rec(obj) + case pub: Publisher[_] => + getAndSet(Inert) match { + case Inert => // nothing to be done + case _ => pub.subscribe(subscriber.asInstanceOf[Subscriber[Any]]) + } } - case sub: Subscription ⇒ - s.cancel() // reject further Subscriptions + case _ => + // spec violation + tryCancel(s) } + + if (s == null) { + val ex = subscriptionMustNotBeNullException + try rec(ErrorPublisher(ex, "failed-VirtualProcessor")) + finally throw ex // must throw NPE, rule 2:13 + } else rec(s) + } + + private def establishSubscription(subscriber: Subscriber[_], subscription: Subscription): Unit = { + val wrapped = new WrappedSubscription(subscription) + try subscriber.onSubscribe(wrapped) + catch { + case NonFatal(ex) => + set(Inert) + tryCancel(subscription) + tryOnError(subscriber, ex) + } } override def onError(t: Throwable): Unit = { - requireNonNullException(t) - if (terminationStatus.compareAndSet(null, Failed(t))) () // let it be picked up by subscribe() - else tryOnError(subscriptionStatus.get.asInstanceOf[Subscriber[T]], t) + /* + * `ex` is always a reasonable Throwable that we should communicate downstream, + * but if `t` was `null` then the spec requires us to throw an NPE (which `ex` + * will be in this case). + */ + @tailrec def rec(ex: Throwable): Unit = + get() match { + case null => + if (!compareAndSet(null, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex) + else if (t == null) throw ex + case s: Subscription => + if (!compareAndSet(s, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex) + else if (t == null) throw ex + case Both(s) => + set(Inert) + try tryOnError(s, ex) + finally if (t == null) throw ex // must throw NPE, rule 2:13 + case s: Subscriber[_] => // spec violation + getAndSet(Inert) match { + case Inert => // nothing to be done + case _ => ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s) + } + case _ => // spec violation or cancellation race, but nothing we can do + } + + val ex = if (t == null) exceptionMustNotBeNullException else t + rec(ex) } - override def onComplete(): Unit = - if (terminationStatus.compareAndSet(null, Completed)) () // let it be picked up by subscribe() - else tryOnComplete(subscriptionStatus.get.asInstanceOf[Subscriber[T]]) - - override def onNext(t: T): Unit = { - requireNonNullElement(t) - tryOnNext(subscriptionStatus.get.asInstanceOf[Subscriber[T]], t) - } - - private final class Sub(s: Subscription) extends AtomicLong with Subscription { - override def cancel(): Unit = { - subscriptionStatus.set(InertSubscriber) - s.cancel() + @tailrec override final def onComplete(): Unit = + get() match { + case null => if (!compareAndSet(null, EmptyPublisher)) onComplete() + case s: Subscription => if (!compareAndSet(s, EmptyPublisher)) onComplete() + case Both(s) => + set(Inert) + tryOnComplete(s) + case s: Subscriber[_] => // spec violation + set(Inert) + EmptyPublisher.subscribe(s) + case _ => // spec violation or cancellation race, but nothing we can do } - @tailrec + + override def onNext(t: T): Unit = + if (t == null) { + val ex = elementMustNotBeNullException + @tailrec def rec(): Unit = + get() match { + case x @ (null | _: Subscription) => if (!compareAndSet(x, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec() + case s: Subscriber[_] => try s.onError(ex) catch { case NonFatal(_) => } finally set(Inert) + case Both(s) => try s.onError(ex) catch { case NonFatal(_) => } finally set(Inert) + case _ => // spec violation or cancellation race, but nothing we can do + } + rec() + throw ex // must throw NPE, rule 2:13 + } else { + @tailrec def rec(): Unit = + get() match { + case Both(s) => + try s.onNext(t) + catch { + case NonFatal(e) => + set(Inert) + throw new IllegalStateException("Subscriber threw exception, this is in violation of rule 2:13", e) + } + case s: Subscriber[_] => // spec violation + val ex = new IllegalStateException(noDemand) + getAndSet(Inert) match { + case Inert => // nothing to be done + case _ => ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s) + } + throw ex + case Inert | _: Publisher[_] => // nothing to be done + case other => + val pub = ErrorPublisher(new IllegalStateException(noDemand), "failed-VirtualPublisher") + if (!compareAndSet(other, pub)) rec() + else throw pub.t + } + rec() + } + + private def noDemand = "spec violation: onNext was signaled from upstream without demand" + + private class WrappedSubscription(real: Subscription) extends Subscription { override def request(n: Long): Unit = { - val current = get - if (current < 0) s.request(n) - else if (compareAndSet(current, current + n)) () - else request(n) + if (n < 1) { + tryCancel(real) + getAndSet(Inert) match { + case Both(s) => rejectDueToNonPositiveDemand(s) + case Inert => // another failure has won the race + case _ => // this cannot possibly happen, but signaling errors is impossible at this point + } + } else real.request(n) } - def closeLatch(): Unit = { - val requested = getAndSet(-1) - if (requested > 0) s.request(requested) + override def cancel(): Unit = { + set(Inert) + real.cancel() } } } /** * INTERNAL API + * + * The implementation of `Sink.asPublisher` needs to offer a `Publisher` that + * defers to the upstream that is connected during materialization. This would + * be trivial if it were not for materialized value computations that may even + * spawn the code that does `pub.subscribe(sub)` in a Future, running concurrently + * with the actual materialization. Therefore we implement a minimial shell here + * that plugs the downstream and the upstream together as soon as both are known. + * Using a VirtualProcessor would technically also work, but it would defeat the + * purpose of subscription timeouts—the subscription would always already be + * established from the Actor’s perspective, regardless of whether a downstream + * will ever be connected. + * + * One important consideration is that this `Publisher` must not retain a reference + * to the `Subscriber` after having hooked it up with the real `Publisher`, hence + * the use of `Inert.subscriber` as a tombstone. + */ +private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Publisher[T] { + import VirtualProcessor.Inert + import ReactiveStreamsCompliance._ + + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = { + requireNonNullSubscriber(subscriber) + @tailrec def rec(): Unit = { + get() match { + case null => if (!compareAndSet(null, subscriber)) rec() + case pub: Publisher[_] => + if (compareAndSet(pub, Inert.subscriber)) { + pub.asInstanceOf[Publisher[T]].subscribe(subscriber) + } else rec() + case _: Subscriber[_] => rejectAdditionalSubscriber(subscriber, "Sink.asPublisher(fanout = false)") + } + } + rec() // return value is boolean only to make the expressions above compile + } + + @tailrec final def registerPublisher(pub: Publisher[_]): Unit = + get() match { + case null => if (!compareAndSet(null, pub)) registerPublisher(pub) + case sub: Subscriber[r] => + set(Inert.subscriber) + pub.asInstanceOf[Publisher[r]].subscribe(sub) + case _ => throw new IllegalStateException("internal error") + } +} + +/** + * INERNAL API */ private[stream] object MaterializerSession { class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace @@ -541,8 +713,9 @@ private[stream] object MaterializerSession { private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initialAttributes: Attributes) { import StreamLayout._ - private var subscribersStack: List[ju.Map[InPort, Subscriber[Any]]] = - new ju.HashMap[InPort, Subscriber[Any]] :: Nil + // the contained maps store either Subscriber[Any] or VirtualPublisher, but the type system cannot express that + private var subscribersStack: List[ju.Map[InPort, AnyRef]] = + new ju.HashMap[InPort, AnyRef] :: Nil private var publishersStack: List[ju.Map[OutPort, Publisher[Any]]] = new ju.HashMap[OutPort, Publisher[Any]] :: Nil @@ -556,7 +729,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo */ private var moduleStack: List[Module] = topLevel :: Nil - private def subscribers: ju.Map[InPort, Subscriber[Any]] = subscribersStack.head + private def subscribers: ju.Map[InPort, AnyRef] = subscribersStack.head private def publishers: ju.Map[OutPort, Publisher[Any]] = publishersStack.head private def currentLayout: Module = moduleStack.head @@ -604,7 +777,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo // (This is an attempt to clean up after an exception during materialization) val errorPublisher = new ErrorPublisher(new MaterializationPanic(cause), "") for (subMap ← subscribersStack; sub ← subMap.asScala.valuesIterator) - errorPublisher.subscribe(sub) + doSubscribe(errorPublisher, sub) for (pubMap ← publishersStack; pub ← pubMap.asScala.valuesIterator) pub.subscribe(new CancellingSubscriber) @@ -680,12 +853,12 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo ret } - final protected def assignPort(in: InPort, subscriber: Subscriber[Any]): Unit = { - subscribers.put(in, subscriber) + final protected def assignPort(in: InPort, subscriberOrVirtual: AnyRef): Unit = { + subscribers.put(in, subscriberOrVirtual) // Interface (unconnected) ports of the current scope will be wired when exiting the scope if (!currentLayout.inPorts(in)) { val publisher = publishers.get(currentLayout.upstreams(in)) - if (publisher ne null) publisher.subscribe(subscriber) + if (publisher ne null) doSubscribe(publisher, subscriberOrVirtual) } } @@ -694,8 +867,14 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo // Interface (unconnected) ports of the current scope will be wired when exiting the scope if (!currentLayout.outPorts(out)) { val subscriber = subscribers.get(currentLayout.downstreams(out)) - if (subscriber ne null) publisher.subscribe(subscriber) + if (subscriber ne null) doSubscribe(publisher, subscriber) } } + private def doSubscribe(publisher: Publisher[_ <: Any], subscriberOrVirtual: AnyRef): Unit = + subscriberOrVirtual match { + case s: Subscriber[_] => publisher.subscribe(s.asInstanceOf[Subscriber[Any]]) + case v: VirtualPublisher[_] => v.registerPublisher(publisher) + } + }