From 5f2e50e416f7a4c7544c39ee28543470ad77cd47 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Mon, 21 Sep 2015 09:48:22 -0400 Subject: [PATCH] +str #18486 Make sure that Source.subscriber's Subscriber throws if subscribed more than once --- .../scala/akka/stream/impl/StreamLayout.scala | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index ef9bced8f2..59dbc6a62d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -369,21 +369,22 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] { subscriptionStatus.get match { case sub: Subscriber[_] ⇒ rejectAdditionalSubscriber(s, "VirtualProcessor") case sub: Sub ⇒ - subscriptionStatus.set(s) - try tryOnSubscribe(s, sub) catch { case NonFatal(ex) ⇒ sub.cancel(); return } - if (!isCancelled) { + try { + subscriptionStatus.set(s) + tryOnSubscribe(s, sub) + if (isCancelled) throw canNotSubscribeTheSameSubscriberMultipleTimesException sub.closeLatch() // allow onNext only now - try { - terminationStatus.getAndSet(Allowed) match { - case null ⇒ // nothing happened yet - case Completed ⇒ tryOnComplete(s) - case Failed(ex) ⇒ tryOnError(s, ex) - case Allowed ⇒ // all good - } - } catch { - case NonFatal(ex) ⇒ sub.cancel() + terminationStatus.getAndSet(Allowed) match { + case null ⇒ // nothing happened yet + case Completed ⇒ tryOnComplete(s) + case Failed(ex) ⇒ tryOnError(s, ex) + case Allowed ⇒ // all good + } - } else throw new IllegalStateException("Cannot be subscribed more then once") + } catch { + case ex @ canNotSubscribeTheSameSubscriberMultipleTimesException ⇒ throw ex + case NonFatal(ex) ⇒ sub.cancel() + } } }