=str #16986 Fix memory leak in PrefixAndTail when using Sink.publisher

The problem was reproduced with the TCK PrefixAndTailTest
required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber
The tck subscriber was still referenced. Profiling revealed that the
root cause was the VirtualPublisher that holds a reference to the
realPublisher, which was MultiStreamOutputProcessor$SubstreamOutput,
which had the reference to the subscriber. The VirtualPublisher
is created by the Sink.publisher in the test, and the test holds
on to that VirtualPublisher reference.

The solution is to null out realPublisher field in the VirtualPublisher.

The old workaround with the NullSubscriber was removed.

Also made Sink.publisher reject additional subscribers.
This commit is contained in:
Patrik Nordwall 2015-04-16 16:05:49 +02:00
parent 050c0549f3
commit 2a975bfb35
7 changed files with 32 additions and 38 deletions

View file

@ -67,7 +67,7 @@ class FlowFromFutureSpec extends AkkaSpec {
"produce elements with multiple subscribers" in {
val promise = Promise[Int]()
val p = Source(promise.future).runWith(Sink.publisher)
val p = Source(promise.future).runWith(Sink.fanoutPublisher(1, 1))
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -83,30 +83,9 @@ class FlowFromFutureSpec extends AkkaSpec {
c2.expectComplete()
}
"produce elements to later subscriber" in {
val promise = Promise[Int]()
val p = Source(promise.future).runWith(Sink.publisher)
val keepAlive = StreamTestKit.SubscriberProbe[Int]()
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(keepAlive)
p.subscribe(c1)
val sub1 = c1.expectSubscription()
sub1.request(1)
promise.success(1)
c1.expectNext(1)
c1.expectComplete()
p.subscribe(c2)
val sub2 = c2.expectSubscription()
sub2.request(1)
c2.expectNext(1)
c2.expectComplete()
}
"allow cancel before receiving element" in {
val promise = Promise[Int]()
val p = Source(promise.future).runWith(Sink.publisher)
val p = Source(promise.future).runWith(Sink.fanoutPublisher(1, 1))
val keepAlive = StreamTestKit.SubscriberProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(keepAlive)

View file

@ -13,12 +13,13 @@ import akka.stream.testkit.StreamTestKit
import akka.stream.impl.PublisherSource
import akka.stream.testkit.StreamTestKit.PublisherProbe
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.impl.ReactiveStreamsCompliance
class SourceSpec extends AkkaSpec {
implicit val materializer = ActorFlowMaterializer()
"Singleton Source" must {
"Single Source" must {
"produce element" in {
val p = Source.single(1).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
@ -29,7 +30,7 @@ class SourceSpec extends AkkaSpec {
c.expectComplete()
}
"produce elements to later subscriber" in {
"reject later subscriber" in {
val p = Source.single(1).runWith(Sink.publisher)
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
@ -39,11 +40,9 @@ class SourceSpec extends AkkaSpec {
sub1.request(1)
c1.expectNext(1)
c1.expectComplete()
p.subscribe(c2)
val sub2 = c2.expectSubscription()
sub2.request(3)
c2.expectNext(1)
c2.expectComplete()
c2.expectSubscriptionAndError()
}
}
@ -55,9 +54,10 @@ class SourceSpec extends AkkaSpec {
p.subscribe(c)
c.expectSubscriptionAndComplete()
// reject additional subscriber
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectSubscriptionAndComplete()
c2.expectSubscriptionAndError()
}
}
@ -69,9 +69,10 @@ class SourceSpec extends AkkaSpec {
p.subscribe(c)
c.expectSubscriptionAndError(ex)
// reject additional subscriber
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectSubscriptionAndError(ex)
c2.expectSubscriptionAndError()
}
}

View file

@ -126,6 +126,7 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF
}
val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
val publisher = new ActorPublisher[Any](impl)
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(publisher)
for ((in, id) inputs.zipWithIndex) {
assignPort(in, FanIn.SubInput[Any](impl, id))
@ -281,6 +282,7 @@ private[akka] object ActorProcessorFactory {
def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
val p = new ActorProcessor[I, O](impl)
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
p
}

View file

@ -18,6 +18,7 @@ private[akka] object ActorProcessor {
def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
val p = new ActorProcessor[I, O](impl)
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
p
}

View file

@ -53,9 +53,15 @@ private[akka] case object CancelledSubscription extends Subscription {
/**
* INTERNAL API
*/
private[akka] case object NullSubscriber extends Subscriber[Any] {
def onComplete(): Unit = ()
def onError(cause: Throwable): Unit = ()
def onNext(elem: Any): Unit = ()
def onSubscribe(s: Subscription): Unit = ()
private[akka] case object RejectAdditionalSubscibers extends Publisher[Nothing] {
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
try {
ReactiveStreamsCompliance.rejectAdditionalSubscriber(subscriber, "Publisher")
} catch {
case _: SpecViolation // nothing we can do
}
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
override def toString: String = "already-subscribed-publisher"
}

View file

@ -311,7 +311,13 @@ private[stream] class VirtualSubscriber[T](val owner: VirtualPublisher[T]) exten
*/
private[stream] class VirtualPublisher[T]() extends Publisher[T] {
@volatile var realPublisher: Publisher[T] = null
override def subscribe(s: Subscriber[_ >: T]): Unit = realPublisher.subscribe(s)
override def subscribe(s: Subscriber[_ >: T]): Unit = {
val sub = realPublisher.subscribe(s)
// unreference the realPublisher to facilitate GC and
// Sink.publisher is supposed to reject additional subscribers anyway
realPublisher = RejectAdditionalSubscibers[T]
sub
}
}
/**

View file

@ -68,7 +68,6 @@ private[akka] object MultiStreamOutputProcessor {
override def cancel(): Unit = {
if (!downstreamCompleted) {
closePublisher(Cancelled)
subscriber = NullSubscriber // FIXME unreference real subscriber, should not be needed after #16986
downstreamCompleted = true
}
}