+str #18486 Make sure that Source.subscriber's Subscriber throws if subscribed more than once

This commit is contained in:
Alexander Golubev 2015-09-19 23:59:06 -04:00
parent e713591e5f
commit 99df500ff2
2 changed files with 35 additions and 13 deletions

View file

@ -360,6 +360,8 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
private val subscriptionStatus = new AtomicReference[AnyRef]
private val terminationStatus = new AtomicReference[Termination]
private[this] def isCancelled() = subscriptionStatus.get == InertSubscriber
override def subscribe(s: Subscriber[_ >: T]): Unit = {
requireNonNullSubscriber(s)
if (subscriptionStatus.compareAndSet(null, s)) () // wait for onSubscribe
@ -367,19 +369,21 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
subscriptionStatus.get match {
case sub: Subscriber[_] rejectAdditionalSubscriber(s, "VirtualProcessor")
case sub: Sub
try {
subscriptionStatus.set(s)
tryOnSubscribe(s, sub)
subscriptionStatus.set(s)
try tryOnSubscribe(s, sub) catch { case NonFatal(ex) sub.cancel(); return }
if (!isCancelled) {
sub.closeLatch() // allow onNext only now
terminationStatus.getAndSet(Allowed) match {
case null // nothing happened yet
case Completed tryOnComplete(s)
case Failed(ex) tryOnError(s, ex)
case Allowed // all good
try {
terminationStatus.getAndSet(Allowed) match {
case null // nothing happened yet
case Completed tryOnComplete(s)
case Failed(ex) tryOnError(s, ex)
case Allowed // all good
}
} catch {
case NonFatal(ex) sub.cancel()
}
} catch {
case NonFatal(ex) sub.cancel()
}
} else throw new IllegalStateException("Cannot be subscribed more then once")
}
}