diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSourceSpec.scala index 29d2d82aa1..4c646d4cf7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSourceSpec.scala @@ -3,9 +3,9 @@ */ package akka.stream.scaladsl -import akka.stream.ActorMaterializer +import akka.stream.{ StreamTcpException, ActorMaterializer } -import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec } import akka.stream.testkit.Utils._ import scala.concurrent.duration._ @@ -24,6 +24,24 @@ class SubscriberSourceSpec extends AkkaSpec("akka.loglevel=DEBUG\nakka.actor.deb Await.result(f, 3.seconds) should be(6) } + + "throw exception if subscribe more then once" in { + val sub = TestSubscriber.manualProbe[String] + val sub1 = Source.subscriber[String].to(Sink(sub)).run() + val source = Source.apply(List("1", "2", "3")) + + val pub1 = source.runWith(Sink.publisher) + val pub2 = source.runWith(Sink.publisher) + + pub1.subscribe(sub1) + val s = sub.expectSubscription() + + a[IllegalStateException] should be thrownBy pub2.subscribe(sub1) + + s.request(3) + sub.expectNext("1", "2", "3") + sub.expectComplete() + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index d84da3721c..ef9bced8f2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -360,6 +360,8 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] { private val subscriptionStatus = new AtomicReference[AnyRef] private val terminationStatus = new AtomicReference[Termination] + private[this] def isCancelled() = subscriptionStatus.get == InertSubscriber + override def subscribe(s: Subscriber[_ >: T]): Unit = { requireNonNullSubscriber(s) if (subscriptionStatus.compareAndSet(null, s)) () // wait for onSubscribe @@ -367,19 +369,21 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] { subscriptionStatus.get match { case sub: Subscriber[_] ⇒ rejectAdditionalSubscriber(s, "VirtualProcessor") case sub: Sub ⇒ - try { - subscriptionStatus.set(s) - tryOnSubscribe(s, sub) + subscriptionStatus.set(s) + try tryOnSubscribe(s, sub) catch { case NonFatal(ex) ⇒ sub.cancel(); return } + if (!isCancelled) { sub.closeLatch() // allow onNext only now - terminationStatus.getAndSet(Allowed) match { - case null ⇒ // nothing happened yet - case Completed ⇒ tryOnComplete(s) - case Failed(ex) ⇒ tryOnError(s, ex) - case Allowed ⇒ // all good + try { + terminationStatus.getAndSet(Allowed) match { + case null ⇒ // nothing happened yet + case Completed ⇒ tryOnComplete(s) + case Failed(ex) ⇒ tryOnError(s, ex) + case Allowed ⇒ // all good + } + } catch { + case NonFatal(ex) ⇒ sub.cancel() } - } catch { - case NonFatal(ex) ⇒ sub.cancel() - } + } else throw new IllegalStateException("Cannot be subscribed more then once") } }