diff --git a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FlatMapConcatDoubleSubscriberTest.scala b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FlatMapConcatDoubleSubscriberTest.scala new file mode 100644 index 0000000000..267de3c81f --- /dev/null +++ b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FlatMapConcatDoubleSubscriberTest.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2015-2018 Lightbend Inc. + */ + +package akka.stream.tck + +import akka.stream.scaladsl.{ Sink, Source } +import org.reactivestreams.{ Publisher, Subscriber } + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Promise } + +class FlatMapConcatDoubleSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { + + def createSubscriber(): Subscriber[Int] = { + val subscriber = Promise[Subscriber[Int]]() + Source.single(Source.fromPublisher(new Publisher[Int] { + def subscribe(s: Subscriber[_ >: Int]): Unit = + subscriber.success(s.asInstanceOf[Subscriber[Int]]) + })).flatMapConcat(identity).runWith(Sink.ignore) + + Await.result(subscriber.future, 1.second) + } + + def createElement(element: Int): Int = element +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index 0b71336c70..a11403525c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -7,6 +7,7 @@ package akka.stream.impl.fusing import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger +import akka.Done import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance.SpecViolation import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage @@ -16,7 +17,7 @@ import akka.stream.testkit.Utils._ import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.testkit.{ EventFilter, TestLatch } -import scala.concurrent.Await +import scala.concurrent.{ Await, Promise } import scala.concurrent.duration._ import org.reactivestreams.{ Publisher, Subscriber, Subscription } @@ -427,6 +428,27 @@ class ActorGraphInterpreterSpec extends StreamSpec { propagatedError shouldBe an[AbruptTerminationException] } + // reproduces #24719 + "not allow a second subscriber" in { + val done = Promise[Done]() + Source.single(Source.fromPublisher(new Publisher[Int] { + def subscribe(s: Subscriber[_ >: Int]): Unit = { + s.onSubscribe(new Subscription { + def cancel(): Unit = () + def request(n: Long): Unit = () + }) + // reactive streams 2.5 - must cancel if called with onSubscribe when already have one running + s.onSubscribe(new Subscription { + def cancel(): Unit = + done.trySuccess(Done) + def request(n: Long): Unit = + done.tryFailure(new IllegalStateException("request should not have been invoked")) + }) + } + })).flatMapConcat(identity).runWith(Sink.ignore) + done.future.futureValue // would throw on failure + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index a8e84b185b..5798a899d6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -205,6 +205,8 @@ import scala.util.control.NonFatal } else if (downstreamCanceled) { upstreamCompleted = true tryCancel(subscription) + } else if (upstream != null) { // reactive streams spec 2.5 + tryCancel(subscription) } else { upstream = subscription // Prefetch