Merge pull request #15367 from drewhk/wip-15355-tee-first-message-race-drewhk

=str #15355 Fix race in Tee causing a RequestMore overtaking ExposedPublisher
This commit is contained in:
drewhk 2014-06-14 12:43:17 +02:00
commit 12b6ee9c46
3 changed files with 7 additions and 41 deletions

View file

@ -197,7 +197,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
override protected def requestFromUpstream(elements: Int): Unit = downstreamBufferSpace += elements
private def subscribePending(): Unit =
exposedPublisher.takePendingSubscribers() foreach super.registerSubscriber
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
override protected def shutdown(completed: Boolean): Unit = {
if (exposedPublisher ne null) {

View file

@ -16,31 +16,14 @@ private[akka] class TeeImpl(_settings: MaterializerSettings, other: Consumer[Any
extends ActorProcessorImpl(_settings) {
override val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, pump = this) {
var hasOtherSubscription = false
var hasDownstreamSubscription = false
var pendingRemoveSubscription: List[S] = Nil
registerSubscriber(other.getSubscriber)
var secondarySubscribed = false
override def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
super.registerSubscriber(subscriber)
if (subscriber == other.getSubscriber)
hasOtherSubscription = true
else
hasDownstreamSubscription = true
if (pendingRemoveSubscription.nonEmpty && hasOtherSubscription && hasDownstreamSubscription) {
pendingRemoveSubscription foreach unregisterSubscription
pendingRemoveSubscription = Nil
if (!secondarySubscribed) {
super.registerSubscriber(other.getSubscriber)
secondarySubscribed = true
}
}
override def unregisterSubscription(subscription: S): Unit = {
// make sure that we don't shutdown because of premature cancel
if (hasOtherSubscription && hasDownstreamSubscription)
super.unregisterSubscription(subscription)
else
pendingRemoveSubscription :+= subscription // defer these until both subscriptions have been registered
super.registerSubscriber(subscriber)
}
override def afterShutdown(): Unit = {
@ -49,7 +32,7 @@ private[akka] class TeeImpl(_settings: MaterializerSettings, other: Consumer[Any
}
}
var running = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val running = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val in = primaryInputs.dequeueInputElement()
primaryOutputs.enqueueOutputElement(in)
}

View file

@ -79,23 +79,6 @@ class FlowTeeSpec extends AkkaSpec {
c2.expectComplete()
}
"produce to downstream even though other cancels before downstream has subscribed" in {
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
val p = Flow(List(1, 2, 3)).
tee(c1).
toProducer(materializer)
val sub1 = c1.expectSubscription()
sub1.cancel()
p.produceTo(c2)
val sub2 = c2.expectSubscription()
sub2.requestMore(3)
c2.expectNext(1)
c2.expectNext(2)
c2.expectNext(3)
c2.expectComplete()
}
}
}