Subscribing twice should cancel second subscriber #24719

This commit is contained in:
Johan Andrén 2018-03-15 13:55:33 +01:00 committed by GitHub
parent 38c0bbef7c
commit a53a09e6ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 1 deletions

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.{ Sink, Source }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.duration._
import scala.concurrent.{ Await, Promise }
class FlatMapConcatDoubleSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
def createSubscriber(): Subscriber[Int] = {
val subscriber = Promise[Subscriber[Int]]()
Source.single(Source.fromPublisher(new Publisher[Int] {
def subscribe(s: Subscriber[_ >: Int]): Unit =
subscriber.success(s.asInstanceOf[Subscriber[Int]])
})).flatMapConcat(identity).runWith(Sink.ignore)
Await.result(subscriber.future, 1.second)
}
def createElement(element: Int): Int = element
}

View file

@ -7,6 +7,7 @@ package akka.stream.impl.fusing
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.stream._
import akka.stream.impl.ReactiveStreamsCompliance.SpecViolation
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
@ -16,7 +17,7 @@ import akka.stream.testkit.Utils._
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.testkit.{ EventFilter, TestLatch }
import scala.concurrent.Await
import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration._
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
@ -427,6 +428,27 @@ class ActorGraphInterpreterSpec extends StreamSpec {
propagatedError shouldBe an[AbruptTerminationException]
}
// reproduces #24719
"not allow a second subscriber" in {
val done = Promise[Done]()
Source.single(Source.fromPublisher(new Publisher[Int] {
def subscribe(s: Subscriber[_ >: Int]): Unit = {
s.onSubscribe(new Subscription {
def cancel(): Unit = ()
def request(n: Long): Unit = ()
})
// reactive streams 2.5 - must cancel if called with onSubscribe when already have one running
s.onSubscribe(new Subscription {
def cancel(): Unit =
done.trySuccess(Done)
def request(n: Long): Unit =
done.tryFailure(new IllegalStateException("request should not have been invoked"))
})
}
})).flatMapConcat(identity).runWith(Sink.ignore)
done.future.futureValue // would throw on failure
}
}
}

View file

@ -205,6 +205,8 @@ import scala.util.control.NonFatal
} else if (downstreamCanceled) {
upstreamCompleted = true
tryCancel(subscription)
} else if (upstream != null) { // reactive streams spec 2.5
tryCancel(subscription)
} else {
upstream = subscription
// Prefetch