From 2a975bfb358e2751eadd624ba65e824bb0567ffb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 16 Apr 2015 16:05:49 +0200 Subject: [PATCH] =str #16986 Fix memory leak in PrefixAndTail when using Sink.publisher The problem was reproduced with the TCK PrefixAndTailTest required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber The tck subscriber was still referenced. Profiling revealed that the root cause was the VirtualPublisher that holds a reference to the realPublisher, which was MultiStreamOutputProcessor$SubstreamOutput, which had the reference to the subscriber. The VirtualPublisher is created by the Sink.publisher in the test, and the test holds on to that VirtualPublisher reference. The solution is to null out realPublisher field in the VirtualPublisher. The old workaround with the NullSubscriber was removed. Also made Sink.publisher reject additional subscribers. --- .../stream/scaladsl/FlowFromFutureSpec.scala | 25 ++----------------- .../akka/stream/scaladsl/SourceSpec.scala | 17 +++++++------ .../impl/ActorFlowMaterializerImpl.scala | 2 ++ .../akka/stream/impl/ActorProcessor.scala | 1 + .../stream/impl/CompletedPublishers.scala | 16 ++++++++---- .../scala/akka/stream/impl/StreamLayout.scala | 8 +++++- .../impl/StreamOfStreamProcessors.scala | 1 - 7 files changed, 32 insertions(+), 38 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index 9572eec255..1868e429af 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -67,7 +67,7 @@ class FlowFromFutureSpec extends AkkaSpec { "produce elements with multiple subscribers" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher) + val p = Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -83,30 +83,9 @@ class FlowFromFutureSpec extends AkkaSpec { c2.expectComplete() } - "produce elements to later subscriber" in { - val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher) - val keepAlive = StreamTestKit.SubscriberProbe[Int]() - val c1 = StreamTestKit.SubscriberProbe[Int]() - val c2 = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(keepAlive) - p.subscribe(c1) - - val sub1 = c1.expectSubscription() - sub1.request(1) - promise.success(1) - c1.expectNext(1) - c1.expectComplete() - p.subscribe(c2) - val sub2 = c2.expectSubscription() - sub2.request(1) - c2.expectNext(1) - c2.expectComplete() - } - "allow cancel before receiving element" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher) + val p = Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)) val keepAlive = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(keepAlive) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 2356445d51..2135173566 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -13,12 +13,13 @@ import akka.stream.testkit.StreamTestKit import akka.stream.impl.PublisherSource import akka.stream.testkit.StreamTestKit.PublisherProbe import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.impl.ReactiveStreamsCompliance class SourceSpec extends AkkaSpec { implicit val materializer = ActorFlowMaterializer() - "Singleton Source" must { + "Single Source" must { "produce element" in { val p = Source.single(1).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() @@ -29,7 +30,7 @@ class SourceSpec extends AkkaSpec { c.expectComplete() } - "produce elements to later subscriber" in { + "reject later subscriber" in { val p = Source.single(1).runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() @@ -39,11 +40,9 @@ class SourceSpec extends AkkaSpec { sub1.request(1) c1.expectNext(1) c1.expectComplete() + p.subscribe(c2) - val sub2 = c2.expectSubscription() - sub2.request(3) - c2.expectNext(1) - c2.expectComplete() + c2.expectSubscriptionAndError() } } @@ -55,9 +54,10 @@ class SourceSpec extends AkkaSpec { p.subscribe(c) c.expectSubscriptionAndComplete() + // reject additional subscriber val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c2) - c2.expectSubscriptionAndComplete() + c2.expectSubscriptionAndError() } } @@ -69,9 +69,10 @@ class SourceSpec extends AkkaSpec { p.subscribe(c) c.expectSubscriptionAndError(ex) + // reject additional subscriber val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c2) - c2.expectSubscriptionAndError(ex) + c2.expectSubscriptionAndError() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 95de18ec38..9d6b9cdc5b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -126,6 +126,7 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF } val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) val publisher = new ActorPublisher[Any](impl) + // Resolve cyclic dependency with actor. This MUST be the first message no matter what. impl ! ExposedPublisher(publisher) for ((in, id) ← inputs.zipWithIndex) { assignPort(in, FanIn.SubInput[Any](impl, id)) @@ -281,6 +282,7 @@ private[akka] object ActorProcessorFactory { def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { val p = new ActorProcessor[I, O](impl) + // Resolve cyclic dependency with actor. This MUST be the first message no matter what. impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) p } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 16e8b838f8..b8f5c29f2e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -18,6 +18,7 @@ private[akka] object ActorProcessor { def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { val p = new ActorProcessor[I, O](impl) + // Resolve cyclic dependency with actor. This MUST be the first message no matter what. impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) p } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 8f5fec2b55..b50545724e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -53,9 +53,15 @@ private[akka] case object CancelledSubscription extends Subscription { /** * INTERNAL API */ -private[akka] case object NullSubscriber extends Subscriber[Any] { - def onComplete(): Unit = () - def onError(cause: Throwable): Unit = () - def onNext(elem: Any): Unit = () - def onSubscribe(s: Subscription): Unit = () +private[akka] case object RejectAdditionalSubscibers extends Publisher[Nothing] { + import ReactiveStreamsCompliance._ + override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = + try { + ReactiveStreamsCompliance.rejectAdditionalSubscriber(subscriber, "Publisher") + } catch { + case _: SpecViolation ⇒ // nothing we can do + } + def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] + override def toString: String = "already-subscribed-publisher" } + diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index f6001f5e6b..e8eb61a230 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -311,7 +311,13 @@ private[stream] class VirtualSubscriber[T](val owner: VirtualPublisher[T]) exten */ private[stream] class VirtualPublisher[T]() extends Publisher[T] { @volatile var realPublisher: Publisher[T] = null - override def subscribe(s: Subscriber[_ >: T]): Unit = realPublisher.subscribe(s) + override def subscribe(s: Subscriber[_ >: T]): Unit = { + val sub = realPublisher.subscribe(s) + // unreference the realPublisher to facilitate GC and + // Sink.publisher is supposed to reject additional subscribers anyway + realPublisher = RejectAdditionalSubscibers[T] + sub + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index b35065503d..555deea567 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -68,7 +68,6 @@ private[akka] object MultiStreamOutputProcessor { override def cancel(): Unit = { if (!downstreamCompleted) { closePublisher(Cancelled) - subscriber = NullSubscriber // FIXME unreference real subscriber, should not be needed after #16986 downstreamCompleted = true } }