=str #16331 Use ReactiveStreamsCompliance utility everywhere

* if onSubscribe, onError, onComplete throws exception we must not call onError,
  see 2:13 https://github.com/reactive-streams/reactive-streams
This commit is contained in:
Patrik Nordwall 2015-02-03 11:34:11 +01:00
parent b442c4d9e6
commit 3876793988
15 changed files with 113 additions and 73 deletions

View file

@ -40,6 +40,7 @@ private[akka] object MultiStreamOutputProcessor {
class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: Cancellable)
extends SimpleOutputs(actor, pump) with Publisher[Any] {
import ReactiveStreamsCompliance._
import SubstreamOutput._
@ -80,8 +81,9 @@ private[akka] object MultiStreamOutputProcessor {
}
private def closeSubscriber(s: Subscriber[Any], withState: CompletedState): Unit = withState match {
case Completed s.onComplete()
case Failed(e) s.onError(e)
case Completed tryOnComplete(s)
case Failed(e: SpecViolation) // nothing to do
case Failed(e) tryOnError(s, e)
}
override def subscribe(s: Subscriber[_ >: Any]): Unit = {
@ -89,9 +91,12 @@ private[akka] object MultiStreamOutputProcessor {
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached s.onError(new IllegalStateException("Substream publisher " + ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber))
case c: CompletedState closeSubscriber(s, c)
case Open throw new IllegalStateException("Publisher cannot become open after being used before")
case _: Attached
tryOnError(s, new IllegalStateException("Substream publisher " + SupportsOnlyASingleSubscriber))
case c: CompletedState
closeSubscriber(s, c)
case Open
throw new IllegalStateException("Publisher cannot become open after being used before")
}
}
}
@ -99,8 +104,9 @@ private[akka] object MultiStreamOutputProcessor {
def attachSubscriber(s: Subscriber[Any]): Unit =
if (subscriber eq null) {
subscriber = s
subscriber.onSubscribe(subscription)
} else subscriber.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher"))
tryOnSubscribe(subscriber, subscription)
} else
tryOnError(subscriber, new IllegalStateException("Substream publisher " + SupportsOnlyASingleSubscriber))
}
}