diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSourceSpec.scala index 4c646d4cf7..29d2d82aa1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubscriberSourceSpec.scala @@ -3,9 +3,9 @@ */ package akka.stream.scaladsl -import akka.stream.{ StreamTcpException, ActorMaterializer } +import akka.stream.ActorMaterializer -import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec } +import akka.stream.testkit.AkkaSpec import akka.stream.testkit.Utils._ import scala.concurrent.duration._ @@ -24,24 +24,6 @@ class SubscriberSourceSpec extends AkkaSpec("akka.loglevel=DEBUG\nakka.actor.deb Await.result(f, 3.seconds) should be(6) } - - "throw exception if subscribe more then once" in { - val sub = TestSubscriber.manualProbe[String] - val sub1 = Source.subscriber[String].to(Sink(sub)).run() - val source = Source.apply(List("1", "2", "3")) - - val pub1 = source.runWith(Sink.publisher) - val pub2 = source.runWith(Sink.publisher) - - pub1.subscribe(sub1) - val s = sub.expectSubscription() - - a[IllegalStateException] should be thrownBy pub2.subscribe(sub1) - - s.request(3) - sub.expectNext("1", "2", "3") - sub.expectComplete() - } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index b00b4b0a8f..d84da3721c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -360,8 +360,6 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] { private val subscriptionStatus = new AtomicReference[AnyRef] private val terminationStatus = new AtomicReference[Termination] - private[this] def isCancelled() = subscriptionStatus.get == InertSubscriber - override def subscribe(s: Subscriber[_ >: T]): Unit = { requireNonNullSubscriber(s) if (subscriptionStatus.compareAndSet(null, s)) () // wait for onSubscribe @@ -372,7 +370,6 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] { try { subscriptionStatus.set(s) tryOnSubscribe(s, sub) - if (isCancelled) throw canNotSubscribeTheSameSubscriberMultipleTimesException sub.closeLatch() // allow onNext only now terminationStatus.getAndSet(Allowed) match { case null ⇒ // nothing happened yet @@ -381,7 +378,7 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] { case Allowed ⇒ // all good } } catch { - case NonFatal(ex) ⇒ if (isCancelled) throw ex else sub.cancel() + case NonFatal(ex) ⇒ sub.cancel() } } }