From 38170e52c9b26429f14974f608cdd92eb4c44917 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Thu, 5 Jun 2014 14:44:46 +0200 Subject: [PATCH] =str #15355: fix race with ExposedPublisher messaage in tee - this change makes the secondary subscription lazy --- .../akka/stream/impl/ActorProcessor.scala | 2 +- .../akka/stream/impl/StaticFanouts.scala | 29 ++++--------------- .../test/scala/akka/stream/FlowTeeSpec.scala | 17 ----------- 3 files changed, 7 insertions(+), 41 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 528de30d25..29953f47b5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -197,7 +197,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu override protected def requestFromUpstream(elements: Int): Unit = downstreamBufferSpace += elements private def subscribePending(): Unit = - exposedPublisher.takePendingSubscribers() foreach super.registerSubscriber + exposedPublisher.takePendingSubscribers() foreach registerSubscriber override protected def shutdown(completed: Boolean): Unit = { if (exposedPublisher ne null) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala index 9438058215..18951e745a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala @@ -16,31 +16,14 @@ private[akka] class TeeImpl(_settings: MaterializerSettings, other: Consumer[Any extends ActorProcessorImpl(_settings) { override val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, pump = this) { - - var hasOtherSubscription = false - var hasDownstreamSubscription = false - var pendingRemoveSubscription: List[S] = Nil - - registerSubscriber(other.getSubscriber) + var secondarySubscribed = false override def registerSubscriber(subscriber: Subscriber[Any]): Unit = { - super.registerSubscriber(subscriber) - if (subscriber == other.getSubscriber) - hasOtherSubscription = true - else - hasDownstreamSubscription = true - if (pendingRemoveSubscription.nonEmpty && hasOtherSubscription && hasDownstreamSubscription) { - pendingRemoveSubscription foreach unregisterSubscription - pendingRemoveSubscription = Nil + if (!secondarySubscribed) { + super.registerSubscriber(other.getSubscriber) + secondarySubscribed = true } - } - - override def unregisterSubscription(subscription: S): Unit = { - // make sure that we don't shutdown because of premature cancel - if (hasOtherSubscription && hasDownstreamSubscription) - super.unregisterSubscription(subscription) - else - pendingRemoveSubscription :+= subscription // defer these until both subscriptions have been registered + super.registerSubscriber(subscriber) } override def afterShutdown(): Unit = { @@ -49,7 +32,7 @@ private[akka] class TeeImpl(_settings: MaterializerSettings, other: Consumer[Any } } - var running = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + val running = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ val in = primaryInputs.dequeueInputElement() primaryOutputs.enqueueOutputElement(in) } diff --git a/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala index 782c12bd9a..c768205a1e 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala @@ -79,23 +79,6 @@ class FlowTeeSpec extends AkkaSpec { c2.expectComplete() } - "produce to downstream even though other cancels before downstream has subscribed" in { - val c1 = StreamTestKit.consumerProbe[Int] - val c2 = StreamTestKit.consumerProbe[Int] - val p = Flow(List(1, 2, 3)). - tee(c1). - toProducer(materializer) - val sub1 = c1.expectSubscription() - sub1.cancel() - p.produceTo(c2) - val sub2 = c2.expectSubscription() - sub2.requestMore(3) - c2.expectNext(1) - c2.expectNext(2) - c2.expectNext(3) - c2.expectComplete() - } - } } \ No newline at end of file