Revert "+str #18486 Source.subscriber's Subscriber throws if subscribed more than once"
This commit is contained in:
parent
b374331b20
commit
a4db27f6ed
2 changed files with 3 additions and 24 deletions
|
|
@ -360,8 +360,6 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
|
|||
private val subscriptionStatus = new AtomicReference[AnyRef]
|
||||
private val terminationStatus = new AtomicReference[Termination]
|
||||
|
||||
private[this] def isCancelled() = subscriptionStatus.get == InertSubscriber
|
||||
|
||||
override def subscribe(s: Subscriber[_ >: T]): Unit = {
|
||||
requireNonNullSubscriber(s)
|
||||
if (subscriptionStatus.compareAndSet(null, s)) () // wait for onSubscribe
|
||||
|
|
@ -372,7 +370,6 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
|
|||
try {
|
||||
subscriptionStatus.set(s)
|
||||
tryOnSubscribe(s, sub)
|
||||
if (isCancelled) throw canNotSubscribeTheSameSubscriberMultipleTimesException
|
||||
sub.closeLatch() // allow onNext only now
|
||||
terminationStatus.getAndSet(Allowed) match {
|
||||
case null ⇒ // nothing happened yet
|
||||
|
|
@ -381,7 +378,7 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
|
|||
case Allowed ⇒ // all good
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒ if (isCancelled) throw ex else sub.cancel()
|
||||
case NonFatal(ex) ⇒ sub.cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue