From 692e49efb28347ec50d0d1f1f23246371b146591 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 18 Nov 2014 23:19:30 +0100 Subject: [PATCH] =str - hardens the IteratorPublisher by making it use the ReactiveStreamsConstants --- .../stream/ReactiveStreamsConstants.scala | 13 +++-- .../akka/stream/impl/IteratorPublisher.scala | 49 ++++++++++--------- .../stream/scaladsl/ActorFlowSource.scala | 12 ++--- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/ReactiveStreamsConstants.scala b/akka-stream/src/main/scala/akka/stream/ReactiveStreamsConstants.scala index 1c8a412c76..f29d2aca0a 100644 --- a/akka-stream/src/main/scala/akka/stream/ReactiveStreamsConstants.scala +++ b/akka-stream/src/main/scala/akka/stream/ReactiveStreamsConstants.scala @@ -3,7 +3,7 @@ */ package akka.stream -import org.reactivestreams.{ Subscription, Subscriber } +import org.reactivestreams.{ Subscription, Subscriber, Publisher } import scala.util.control.NonFatal @@ -24,6 +24,9 @@ object ReactiveStreamsConstants { final def validateRequest(n: Long): Unit = if (n < 1) throw new IllegalArgumentException(NumberOfElementsInRequestMustBePositiveMsg) with SpecViolation + final def rejectAdditionalSubscriber[T](subsriber: Subscriber[T], rejector: Publisher[T]): Unit = + tryOnError(subsriber, new IllegalStateException(s"$rejector $SupportsOnlyASingleSubscriber")) + sealed trait SpecViolation { self: Throwable ⇒ def violation: Throwable = self // this method is needed because Scalac is not smart enough to handle it otherwise @@ -32,8 +35,12 @@ object ReactiveStreamsConstants { final class SignalThrewException(message: String, cause: Throwable) extends IllegalStateException(message, cause) with SpecViolation final def tryOnError[T](subscriber: Subscriber[T], error: Throwable): Unit = - try subscriber.onError(error) catch { - case NonFatal(t) ⇒ throw new SignalThrewException(subscriber + ".onError", t) + error match { + case sv: SpecViolation ⇒ throw new IllegalStateException("It is not legal to try to signal onError with a SpecViolation", sv.violation) + case other ⇒ + try subscriber.onError(other) catch { + case NonFatal(t) ⇒ throw new SignalThrewException(subscriber + ".onError", t) + } } final def tryOnNext[T](subscriber: Subscriber[T], element: T): Unit = diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala index 9584e7140d..d7a6ede7e5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala @@ -22,11 +22,12 @@ private[akka] object IteratorPublisher { private case object PushMore private sealed trait State + private sealed trait StopState extends State private case object Unitialized extends State private case object Initialized extends State - private case object Cancelled extends State - private case object Completed extends State - private case class Errored(cause: Throwable) extends State + private case object Cancelled extends StopState + private case object Completed extends StopState + private case class Errored(cause: Throwable) extends StopState } /** @@ -35,18 +36,20 @@ private[akka] object IteratorPublisher { */ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: MaterializerSettings) extends Actor { import IteratorPublisher._ + import ReactiveStreamsConstants._ private var exposedPublisher: ActorPublisher[Any] = _ private var subscriber: Subscriber[Any] = _ private var downstreamDemand: Long = 0L private var state: State = Unitialized - private val maxPush = settings.maxInputBufferSize + private val maxPush = settings.maxInputBufferSize // FIXME why is this a good number? def receive = { case ExposedPublisher(publisher) ⇒ exposedPublisher = publisher context.become(waitingForFirstSubscriber) - case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") + case _ ⇒ + throw new IllegalStateException("The first message must be ExposedPublisher") } def waitingForFirstSubscriber: Receive = { @@ -64,11 +67,9 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: Materia def active: Receive = { case RequestMore(_, elements) ⇒ downstreamDemand += elements - if (downstreamDemand < 0) { - // Long has overflown, reactive-streams specification rule 3.17 - val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue) - stop(Errored(demandOverflowException)) - } else + if (downstreamDemand < 0) // Long has overflown, reactive-streams specification rule 3.17 + stop(Errored(new IllegalStateException(TotalPendingDemandMustNotExceedLongMaxValue))) + else push() case PushMore ⇒ push() @@ -84,7 +85,7 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: Materia if (downstreamDemand > 0) { downstreamDemand -= 1 val hasNext = { - subscriber.onNext(iterator.next()) + tryOnNext(subscriber, iterator.next()) iterator.hasNext } if (!hasNext) @@ -101,20 +102,22 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: Materia } private def registerSubscriber(sub: Subscriber[Any]): Unit = { - if (subscriber eq null) { - subscriber = sub - subscriber.onSubscribe(new ActorSubscription(self, sub)) - } else - sub.onError(new IllegalStateException(s"${Logging.simpleName(this)} ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}")) + subscriber match { + case null ⇒ + subscriber = sub + tryOnSubscribe(sub, new ActorSubscription(self, sub)) + case _ ⇒ + rejectAdditionalSubscriber(sub, exposedPublisher) + } } - private def stop(reason: State): Unit = { + private def stop(reason: StopState): Unit = { state match { - case _: Errored | Cancelled | Completed ⇒ throw new IllegalStateException - case _ ⇒ // ok + case _: StopState ⇒ throw new IllegalStateException(s"Already stopped. Transition attempted from $state to $reason") + case _ ⇒ + state = reason + context.stop(self) } - state = reason - context.stop(self) } override def postStop(): Unit = { @@ -122,10 +125,10 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: Materia case Unitialized | Initialized | Cancelled ⇒ if (exposedPublisher ne null) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) case Completed ⇒ - subscriber.onComplete() + tryOnComplete(subscriber) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) case Errored(e) ⇒ - subscriber.onError(e) + tryOnError(subscriber, e) exposedPublisher.shutdown(Some(e)) } // if onComplete or onError throws we let normal supervision take care of it, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index c4c7587ec0..95332f9599 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -107,12 +107,12 @@ final case class IterableSource[Out](iterable: immutable.Iterable[Out]) extends create(materializer, flowName)._1.subscribe(flowSubscriber) override def isActive: Boolean = true override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = { - val publisher = try { - val it = iterable.iterator - ActorPublisher[Out](materializer.actorOf(IteratorPublisher.props(it, materializer.settings), name = s"$flowName-0-iterable")) - } catch { - case NonFatal(e) ⇒ ErrorPublisher(e, s"$flowName-0-error").asInstanceOf[Publisher[Out]] - } + val publisher = + try ActorPublisher[Out]( + materializer.actorOf(IteratorPublisher.props(iterable.iterator, materializer.settings), + name = s"$flowName-0-iterable")) catch { + case NonFatal(e) ⇒ ErrorPublisher(e, s"$flowName-0-error").asInstanceOf[Publisher[Out]] + } (publisher, ()) } }