+str #18486 Make sure that Source.subscriber's Subscriber throws if subscribed more than once
This commit is contained in:
parent
99df500ff2
commit
5f2e50e416
1 changed files with 14 additions and 13 deletions
|
|
@ -369,21 +369,22 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
|
|||
subscriptionStatus.get match {
|
||||
case sub: Subscriber[_] ⇒ rejectAdditionalSubscriber(s, "VirtualProcessor")
|
||||
case sub: Sub ⇒
|
||||
subscriptionStatus.set(s)
|
||||
try tryOnSubscribe(s, sub) catch { case NonFatal(ex) ⇒ sub.cancel(); return }
|
||||
if (!isCancelled) {
|
||||
try {
|
||||
subscriptionStatus.set(s)
|
||||
tryOnSubscribe(s, sub)
|
||||
if (isCancelled) throw canNotSubscribeTheSameSubscriberMultipleTimesException
|
||||
sub.closeLatch() // allow onNext only now
|
||||
try {
|
||||
terminationStatus.getAndSet(Allowed) match {
|
||||
case null ⇒ // nothing happened yet
|
||||
case Completed ⇒ tryOnComplete(s)
|
||||
case Failed(ex) ⇒ tryOnError(s, ex)
|
||||
case Allowed ⇒ // all good
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒ sub.cancel()
|
||||
terminationStatus.getAndSet(Allowed) match {
|
||||
case null ⇒ // nothing happened yet
|
||||
case Completed ⇒ tryOnComplete(s)
|
||||
case Failed(ex) ⇒ tryOnError(s, ex)
|
||||
case Allowed ⇒ // all good
|
||||
|
||||
}
|
||||
} else throw new IllegalStateException("Cannot be subscribed more then once")
|
||||
} catch {
|
||||
case ex @ canNotSubscribeTheSameSubscriberMultipleTimesException ⇒ throw ex
|
||||
case NonFatal(ex) ⇒ sub.cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue