diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index 9572eec255..1868e429af 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -67,7 +67,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce elements with multiple subscribers" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher) + val p = Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -83,30 +83,9 @@ class FlowFromFutureSpec extends AkkaSpec { c2.expectComplete() } - "produce elements to later subscriber" in { - val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher) - val keepAlive = StreamTestKit.SubscriberProbe[Int]() - val c1 = StreamTestKit.SubscriberProbe[Int]() - val c2 = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(keepAlive) - p.subscribe(c1) - - val sub1 = c1.expectSubscription() - sub1.request(1) - promise.success(1) - c1.expectNext(1) - c1.expectComplete() - p.subscribe(c2) - val sub2 = c2.expectSubscription() - sub2.request(1) - c2.expectNext(1) - c2.expectComplete() - } - "allow cancel before receiving element" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher) + val p = Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)) val keepAlive = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(keepAlive) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 2356445d51..2135173566 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -13,12 +13,13 @@ import akka.stream.testkit.StreamTestKit import akka.stream.impl.PublisherSource import akka.stream.testkit.StreamTestKit.PublisherProbe import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.impl.ReactiveStreamsCompliance class SourceSpec extends AkkaSpec { implicit val materializer = ActorFlowMaterializer() - "Singleton Source" must { + "Single Source" must { "produce element" in { val p = Source.single(1).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() @@ -29,7 +30,7 @@ class SourceSpec extends AkkaSpec { c.expectComplete() } - "produce elements to later subscriber" in { + "reject later subscriber" in { val p = Source.single(1).runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -39,11 +40,9 @@ class SourceSpec extends AkkaSpec { sub1.request(1) c1.expectNext(1) c1.expectComplete() + p.subscribe(c2) - val sub2 = c2.expectSubscription() - sub2.request(3) - c2.expectNext(1) - c2.expectComplete() + c2.expectSubscriptionAndError() } } @@ -55,9 +54,10 @@ class SourceSpec extends AkkaSpec { p.subscribe(c) c.expectSubscriptionAndComplete() + // reject additional subscriber val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c2) - c2.expectSubscriptionAndComplete() + c2.expectSubscriptionAndError() } } @@ -69,9 +69,10 @@ class SourceSpec extends AkkaSpec { p.subscribe(c) c.expectSubscriptionAndError(ex) + // reject additional subscriber val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c2) - c2.expectSubscriptionAndError(ex) + c2.expectSubscriptionAndError() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 95de18ec38..9d6b9cdc5b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -126,6 +126,7 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF } val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) val publisher = new ActorPublisher[Any](impl) + // Resolve cyclic dependency with actor. This MUST be the first message no matter what. impl ! ExposedPublisher(publisher) for ((in, id) ← inputs.zipWithIndex) { assignPort(in, FanIn.SubInput[Any](impl, id)) @@ -281,6 +282,7 @@ private[akka] object ActorProcessorFactory { def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { val p = new ActorProcessor[I, O](impl) + // Resolve cyclic dependency with actor. This MUST be the first message no matter what. impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) p } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 16e8b838f8..b8f5c29f2e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -18,6 +18,7 @@ private[akka] object ActorProcessor { def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { val p = new ActorProcessor[I, O](impl) + // Resolve cyclic dependency with actor. This MUST be the first message no matter what. impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) p } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 8f5fec2b55..b50545724e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -53,9 +53,15 @@ private[akka] case object CancelledSubscription extends Subscription { /** * INTERNAL API */ -private[akka] case object NullSubscriber extends Subscriber[Any] { - def onComplete(): Unit = () - def onError(cause: Throwable): Unit = () - def onNext(elem: Any): Unit = () - def onSubscribe(s: Subscription): Unit = () +private[akka] case object RejectAdditionalSubscibers extends Publisher[Nothing] { + import ReactiveStreamsCompliance._ + override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = + try { + ReactiveStreamsCompliance.rejectAdditionalSubscriber(subscriber, "Publisher") + } catch { + case _: SpecViolation ⇒ // nothing we can do + } + def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] + override def toString: String = "already-subscribed-publisher" } + diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index f6001f5e6b..e8eb61a230 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -311,7 +311,13 @@ private[stream] class VirtualSubscriber[T](val owner: VirtualPublisher[T]) exten */ private[stream] class VirtualPublisher[T]() extends Publisher[T] { @volatile var realPublisher: Publisher[T] = null - override def subscribe(s: Subscriber[_ >: T]): Unit = realPublisher.subscribe(s) + override def subscribe(s: Subscriber[_ >: T]): Unit = { + val sub = realPublisher.subscribe(s) + // unreference the realPublisher to facilitate GC and + // Sink.publisher is supposed to reject additional subscribers anyway + realPublisher = RejectAdditionalSubscibers[T] + sub + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index b35065503d..555deea567 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -68,7 +68,6 @@ private[akka] object MultiStreamOutputProcessor { override def cancel(): Unit = { if (!downstreamCompleted) { closePublisher(Cancelled) - subscriber = NullSubscriber // FIXME unreference real subscriber, should not be needed after #16986 downstreamCompleted = true } }