diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala index 21aafa4d0f..a35f490d6c 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala @@ -37,9 +37,6 @@ abstract class AkkaPublisherVerification[T](val system: ActorSystem, env: TestEn override def skipStochasticTests() = true // TODO maybe enable? - // TODO re-enable this test once 1.0.0.RC1 is released, with https://github.com/reactive-streams/reactive-streams/pull/154 - override def spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() = notVerified("TODO Enable this test once https://github.com/reactive-streams/reactive-streams/pull/154 is merged (ETA 1.0.0.RC1)") - @AfterClass def shutdownActorSystem(): Unit = { system.shutdown() diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala new file mode 100644 index 0000000000..0385e167be --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import org.reactivestreams._ + +import scala.concurrent.Promise + +class FuturePublisherTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = { + val p = Promise[Int]() + val pub = Source(p.future).runWith(Sink.publisher) + p.success(0) + pub + } + + override def maxElementsFromPublisher(): Long = 1 +} \ No newline at end of file diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala new file mode 100644 index 0000000000..bd3275d5cd --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.impl.SynchronousIterablePublisher + +import scala.collection.immutable +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import org.reactivestreams._ + +class SyncIterablePublisherTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = { + val iterable: immutable.Iterable[Int] = + if (elements == Long.MaxValue) + new immutable.Iterable[Int] { override def iterator = Iterator from 0 } + else + 0 until elements.toInt + + Source(SynchronousIterablePublisher(iterable, "synchronous-iterable-publisher")).runWith(Sink.publisher) + } + + override def spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() = notVerified("RS TCK 1.0.0.M3 does not handle sync publishers well") +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index 27822c4c99..b9fda1f651 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -118,7 +118,7 @@ trait ActorPublisher[T] extends Actor { import akka.stream.actor.ActorPublisherMessage._ import ActorPublisher.Internal._ import ActorPublisherMessage._ - + import ReactiveStreamsCompliance._ private val state = ActorPublisherState(context.system) private var subscriber: Subscriber[Any] = _ private var demand = 0L @@ -180,7 +180,7 @@ trait ActorPublisher[T] extends Actor { case Active | PreSubscriber ⇒ if (demand > 0) { demand -= 1 - subscriber.onNext(element) + tryOnNext(subscriber, element) } else throw new IllegalStateException( "onNext is not allowed when the stream has not requested elements, totalDemand was 0") @@ -199,8 +199,8 @@ trait ActorPublisher[T] extends Actor { case Active | PreSubscriber ⇒ lifecycleState = Completed if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives - subscriber.onComplete() - subscriber = null // not used after onError + tryOnComplete(subscriber) + subscriber = null // not used after onComplete case Completed ⇒ throw new IllegalStateException("onComplete must only be called once") case _: ErrorEmitted ⇒ @@ -216,7 +216,7 @@ trait ActorPublisher[T] extends Actor { case Active | PreSubscriber ⇒ lifecycleState = ErrorEmitted(cause) if (subscriber ne null) // otherwise onError will be called when the subscription arrives - subscriber.onError(cause) + tryOnError(subscriber, cause) subscriber = null // not used after onError case _: ErrorEmitted ⇒ throw new IllegalStateException("onError must only be called once") @@ -230,13 +230,18 @@ trait ActorPublisher[T] extends Actor { */ protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match { case Request(n) ⇒ - demand += n - if (demand < 0 && lifecycleState == Active) { - // Long has overflown - val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue) - onError(demandOverflowException) - } else - super.aroundReceive(receive, msg) + if (n < 1) { + if (lifecycleState == Active) + onError(numberOfElementsInRequestMustBePositiveException) + else + super.aroundReceive(receive, msg) + } else { + demand += n + if (demand < 0 && lifecycleState == Active) // Long has overflown + onError(totalPendingDemandMustNotExceedLongMaxValueException) + else + super.aroundReceive(receive, msg) + } case Subscribe(sub: Subscriber[_]) ⇒ lifecycleState match { @@ -245,13 +250,12 @@ trait ActorPublisher[T] extends Actor { subscriber = sub lifecycleState = Active sub.onSubscribe(new ActorPublisherSubscription(self)) - case ErrorEmitted(cause) ⇒ sub.onError(cause) - case Completed ⇒ sub.onComplete() + case ErrorEmitted(cause) ⇒ tryOnError(sub, cause) + case Completed ⇒ tryOnComplete(sub) case Active | Canceled ⇒ - if (subscriber == sub) - sub.onError(new IllegalStateException(s"ActorPublisher [$self, sub: $sub] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) - else - sub.onError(new IllegalStateException(s"ActorPublisher [$self] ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}")) + tryOnError(sub, + if (subscriber eq sub) ReactiveStreamsCompliance.canNotSubscribeTheSameSubscriberMultipleTimesException + else ReactiveStreamsCompliance.canNotSubscribeTheSameSubscriberMultipleTimesException) } case Cancel ⇒ @@ -317,7 +321,7 @@ trait ActorPublisher[T] extends Actor { */ protected[akka] override def aroundPostStop(): Unit = { state.remove(self) - if (lifecycleState == Active) subscriber.onComplete() + if (lifecycleState == Active) tryOnComplete(subscriber) super.aroundPostStop() } @@ -341,10 +345,7 @@ private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscri import ActorPublisher._ import ActorPublisherMessage._ - override def request(n: Long): Unit = { - ReactiveStreamsCompliance.validateRequest(n) - ref ! Request(n) - } + override def request(n: Long): Unit = ref ! Request(n) override def cancel(): Unit = ref ! Cancel } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 8c252d0375..109382723e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -129,7 +129,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) } protected def completed: Actor.Receive = { - case OnSubscribe(subscription) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber") + case OnSubscribe(subscription) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber") // FIXME "shutdown subscriber"?! } protected def inputOnError(e: Throwable): Unit = { @@ -177,9 +177,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D def isClosed: Boolean = downstreamCompleted - protected def createSubscription(): Subscription = { - new ActorSubscription(actor, subscriber) - } + protected def createSubscription(): Subscription = new ActorSubscription(actor, subscriber) private def subscribePending(subscribers: Seq[Subscriber[Any]]): Unit = subscribers foreach { sub ⇒ @@ -201,15 +199,16 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D case SubscribePending ⇒ subscribePending(exposedPublisher.takePendingSubscribers()) case RequestMore(subscription, elements) ⇒ + if (elements < 1) { + cancel(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) + } else { + downstreamDemand += elements + if (downstreamDemand < 1) { // Long has overflown + cancel(ReactiveStreamsCompliance.totalPendingDemandMustNotExceedLongMaxValueException) + } - downstreamDemand += elements - if (downstreamDemand < 0) { - // Long has overflown - val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue) - cancel(demandOverflowException) + pump.pump() // FIXME should this be called even on overflow, sounds like a bug to me } - - pump.pump() case Cancel(subscription) ⇒ downstreamCompleted = true exposedPublisher.shutdown(Some(new ActorPublisher.NormalShutdownException)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 2c75716616..a17372bd2a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -89,9 +89,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { * INTERNAL API */ private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends Subscription { - override def request(elements: Long): Unit = - if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) - else impl ! RequestMore(this, elements) + override def request(elements: Long): Unit = impl ! RequestMore(this, elements) override def cancel(): Unit = impl ! Cancel(this) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 8ae9941ec6..a672d2548a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -18,7 +18,8 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] { * INTERNAL API */ private[akka] case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] { - override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = subscriber.onError(t) + override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = + ReactiveStreamsCompliance.tryOnError(subscriber, t) // FIXME how to deal with spec violations here? def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] override def toString: String = name } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index dd012d15f5..b84234e2d7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -28,15 +28,13 @@ private[akka] object FuturePublisher { object FutureSubscription { case class Cancel(subscription: FutureSubscription) - case class RequestMore(subscription: FutureSubscription) + case class RequestMore(subscription: FutureSubscription, elements: Long) } class FutureSubscription(ref: ActorRef) extends Subscription { import akka.stream.impl.FuturePublisher.FutureSubscription._ def cancel(): Unit = ref ! Cancel(this) - def request(elements: Long): Unit = - if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) - else ref ! RequestMore(this) + def request(elements: Long): Unit = ref ! RequestMore(this, elements) override def toString = "FutureSubscription" } } @@ -44,11 +42,12 @@ private[akka] object FuturePublisher { /** * INTERNAL API */ -//FIXME why do we need to have an actor to drive a Future? +// FIXME why do we need to have an actor to drive a Future? private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerSettings) extends Actor with SoftShutdown { import akka.stream.impl.FuturePublisher.FutureSubscription import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore + import ReactiveStreamsCompliance._ var exposedPublisher: ActorPublisher[Any] = _ var subscribers = Map.empty[Subscriber[Any], FutureSubscription] @@ -77,42 +76,53 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS def active: Receive = { case SubscribePending ⇒ exposedPublisher.takePendingSubscribers() foreach registerSubscriber - case RequestMore(subscription) ⇒ + case RequestMore(subscription, elements) ⇒ // FIXME we aren't tracking demand per subscription so we don't check for overflow. We should. if (subscriptions.contains(subscription)) { - subscriptionsReadyForPush += subscription - push(subscriptions(subscription)) + if (elements < 1) { + val subscriber = subscriptions(subscription) + rejectDueToNonPositiveDemand(subscriber) + removeSubscriber(subscriber) + } else { + subscriptionsReadyForPush += subscription + push(subscriptions(subscription)) + } } case Cancel(subscription) if subscriptions.contains(subscription) ⇒ removeSubscriber(subscriptions(subscription)) case Status.Failure(ex) ⇒ - futureValue = Some(Failure(ex)) - pushToAll() + if (futureValue.isEmpty) { + futureValue = Some(Failure(ex)) + pushToAll() + } case value ⇒ - futureValue = Some(Success(value)) - pushToAll() + if (futureValue.isEmpty) { + futureValue = Some(Success(value)) + pushToAll() + } } def pushToAll(): Unit = subscriptionsReadyForPush foreach { subscription ⇒ push(subscriptions(subscription)) } def push(subscriber: Subscriber[Any]): Unit = futureValue match { case Some(Success(value)) ⇒ - subscriber.onNext(value) - subscriber.onComplete() + + tryOnNext(subscriber, value) + tryOnComplete(subscriber) removeSubscriber(subscriber) case Some(Failure(t)) ⇒ - subscriber.onError(t) + tryOnError(subscriber, t) removeSubscriber(subscriber) case None ⇒ // not completed yet } def registerSubscriber(subscriber: Subscriber[Any]): Unit = { - if (subscribers.contains(subscriber)) - subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) + if (subscribers.contains(subscriber)) // FIXME this is not legal AFAICT, needs to check identity, not equality + rejectDuplicateSubscriber(subscriber) else { val subscription = new FutureSubscription(self) subscribers = subscribers.updated(subscriber, subscription) subscriptions = subscriptions.updated(subscription, subscriber) - subscriber.onSubscribe(subscription) + tryOnSubscribe(subscriber, subscription) } } @@ -127,7 +137,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS } } - override def postStop(): Unit = + override def postStop(): Unit = // FIXME if something blows up, are the subscribers onErrored? if (exposedPublisher ne null) exposedPublisher.shutdown(shutdownReason) diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala index 2874dfd221..2ea5fc030b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala @@ -66,11 +66,15 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: Materia def active: Receive = { case RequestMore(_, elements) ⇒ - downstreamDemand += elements - if (downstreamDemand < 0) // Long has overflown, reactive-streams specification rule 3.17 - stop(Errored(new IllegalStateException(TotalPendingDemandMustNotExceedLongMaxValue))) - else - push() + if (elements < 1) + stop(Errored(numberOfElementsInRequestMustBePositiveException)) + else { + downstreamDemand += elements + if (downstreamDemand < 0) // Long has overflown, reactive-streams specification rule 3.17 + stop(Errored(totalPendingDemandMustNotExceedLongMaxValueException)) + else + push() + } case PushMore ⇒ push() case _: Cancel ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala index 5ed043e98e..882e410047 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala @@ -17,7 +17,7 @@ private[akka] case class RequestMore(subscription: ActorSubscription[_], demand: /** * INTERNAL API */ -private[akka] case class Cancel(subscriptions: ActorSubscription[_]) +private[akka] case class Cancel(subscription: ActorSubscription[_]) /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala index 63d7f87c05..e3e40df3b0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala @@ -20,11 +20,26 @@ private[stream] object ReactiveStreamsCompliance { final val TotalPendingDemandMustNotExceedLongMaxValue = "Total pending demand MUST NOT be > `java.lang.Long.MAX_VALUE` (see reactive-streams specification, rule 3.17)" - final def validateRequest(n: Long): Unit = - if (n < 1) throw new IllegalArgumentException(NumberOfElementsInRequestMustBePositiveMsg) with SpecViolation + final def totalPendingDemandMustNotExceedLongMaxValueException: Throwable = + new IllegalStateException(TotalPendingDemandMustNotExceedLongMaxValue) - final def rejectAdditionalSubscriber[T](subsriber: Subscriber[T], rejector: Publisher[T]): Unit = - tryOnError(subsriber, new IllegalStateException(s"$rejector $SupportsOnlyASingleSubscriber")) + final def numberOfElementsInRequestMustBePositiveException: Throwable = + new IllegalArgumentException(NumberOfElementsInRequestMustBePositiveMsg) + + final def canNotSubscribeTheSameSubscriberMultipleTimesException: Throwable = + new IllegalStateException(CanNotSubscribeTheSameSubscriberMultipleTimes) + + final def rejectDuplicateSubscriber[T](subscriber: Subscriber[T]): Unit = + tryOnError(subscriber, canNotSubscribeTheSameSubscriberMultipleTimesException) + + final def rejectAdditionalSubscriber[T](subscriber: Subscriber[T], rejector: Publisher[T]): Unit = + tryOnError(subscriber, new IllegalStateException(s"$rejector $SupportsOnlyASingleSubscriber")) + + final def rejectDueToOverflow[T](subscriber: Subscriber[T]): Unit = + tryOnError(subscriber, totalPendingDemandMustNotExceedLongMaxValueException) + + final def rejectDueToNonPositiveDemand[T](subscriber: Subscriber[T]): Unit = + tryOnError(subscriber, numberOfElementsInRequestMustBePositiveException) sealed trait SpecViolation { self: Throwable ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 12d88ae812..7260e973fa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -24,9 +24,7 @@ private[akka] object MultiStreamOutputProcessor { case class SubstreamSubscriptionTimeout(substream: SubstreamKey) class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { - override def request(elements: Long): Unit = - if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) - else parent ! SubstreamRequestMore(substreamKey, elements) + override def request(elements: Long): Unit = parent ! SubstreamRequestMore(substreamKey, elements) override def cancel(): Unit = parent ! SubstreamCancel(substreamKey) override def toString = "SubstreamSubscription" + System.identityHashCode(this) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index 9d630c1272..0c19f98ae0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -41,17 +41,6 @@ private[akka] trait SubscriptionWithCursor[T] extends Subscription with Resizabl def dispatch(element: T): Unit = subscriber.onNext(element) - /** Increases the `requested` counter, additionally providing overflow protection */ - def moreRequested(demand: Long): Long = { - val sum = totalDemand + demand - val noOverflow = sum > 0 - - if (noOverflow) totalDemand = sum - else subscriber.onError(new IllegalStateException(s"Total pending demand ($totalDemand + $demand) would overflow `Long`, for Subscriber $subscriber! ${ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue}")) - - sum - } - var active = true /** Do not increment directly, use `moreRequested(Long)` instead (it provides overflow protection)! */ @@ -112,29 +101,43 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff */ protected def moreRequested(subscription: S, elements: Long): Unit = if (subscription.active) { - // returns Long.MinValue if the subscription is to be terminated - @tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long = - if (requested == 0) { - // if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore` - if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0 - } else if (buffer.count(subscription) > 0) { - subscription.dispatch(buffer.read(subscription)) - dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) - } else if (eos ne NotReached) Long.MinValue - else requested + import ReactiveStreamsCompliance._ + // check for illegal demand See 3.9 + if (elements < 1) { + try tryOnError(subscription.subscriber, numberOfElementsInRequestMustBePositiveException) + finally unregisterSubscriptionInternal(subscription) + } else { + endOfStream match { + case eos @ (NotReached | Completed) ⇒ + val demand = subscription.totalDemand + elements + //Check for overflow + if (demand < 1) { + try tryOnError(subscription.subscriber, totalPendingDemandMustNotExceedLongMaxValueException) + finally unregisterSubscriptionInternal(subscription) + } else { + subscription.totalDemand = demand + // returns Long.MinValue if the subscription is to be terminated + @tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long = + if (requested == 0) { + // if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore` + if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0 + } else if (buffer.count(subscription) > 0) { + subscription.dispatch(buffer.read(subscription)) // FIXME this does not gracefully handle the case if onNext throws + dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) + } else if (eos ne NotReached) Long.MinValue + else requested - endOfStream match { - case eos @ (NotReached | Completed) ⇒ - val demand = subscription.moreRequested(elements) - dispatchFromBufferAndReturnRemainingRequested(demand, eos) match { - case Long.MinValue ⇒ - eos(subscription.subscriber) - unregisterSubscriptionInternal(subscription) - case x ⇒ - subscription.totalDemand = x - requestFromUpstreamIfRequired() - } - case ErrorCompleted(_) ⇒ // ignore, the Subscriber might not have seen our error event yet + dispatchFromBufferAndReturnRemainingRequested(demand, eos) match { + case Long.MinValue ⇒ + eos(subscription.subscriber) + unregisterSubscriptionInternal(subscription) + case x ⇒ + subscription.totalDemand = x + requestFromUpstreamIfRequired() + } + } + case ErrorCompleted(_) ⇒ // ignore, the Subscriber might not have seen our error event yet + } } } @@ -212,20 +215,17 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff * Register a new subscriber. */ protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match { - case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒ - subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [${this}, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) - case NotReached ⇒ - val newSubscription = createSubscription(subscriber) - subscriptions ::= newSubscription - buffer.initCursor(newSubscription) - subscriber.onSubscribe(newSubscription) - case Completed if buffer.nonEmpty ⇒ - val newSubscription = createSubscription(subscriber) - subscriptions ::= newSubscription - buffer.initCursor(newSubscription) - subscriber.onSubscribe(newSubscription) - case eos ⇒ - eos(subscriber) + case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒ ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber) + case NotReached ⇒ addSubscription(subscriber) + case Completed if buffer.nonEmpty ⇒ addSubscription(subscriber) + case eos ⇒ eos(subscriber) + } + + protected def addSubscription(subscriber: Subscriber[_ >: T]): Unit = { + val newSubscription = createSubscription(subscriber) + subscriptions ::= newSubscription + buffer.initCursor(newSubscription) + ReactiveStreamsCompliance.tryOnSubscribe(subscriber, newSubscription) // FIXME what if this throws? } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala index a530e6493f..22ac209e13 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala @@ -28,49 +28,66 @@ private[akka] object SynchronousIterablePublisher { var pendingDemand = 0L var pushing = false + import ReactiveStreamsCompliance._ + def init(): Unit = try { - if (!iterator.hasNext) { + if (!iterator.hasNext) { // Let's be prudent and issue onComplete immediately cancel() - subscriber.onSubscribe(this) - subscriber.onComplete() + tryOnSubscribe(subscriber, this) + tryOnComplete(subscriber) } else { - subscriber.onSubscribe(this) + tryOnSubscribe(subscriber, this) } } catch { + case sv: SpecViolation ⇒ + cancel() + throw sv.violation // I think it is prudent to "escalate" the spec violation case NonFatal(e) ⇒ cancel() - subscriber.onError(e) + tryOnError(subscriber, e) } - override def cancel(): Unit = - done = true + override def cancel(): Unit = done = true override def request(elements: Long): Unit = { - if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) - @tailrec def pushNext(): Unit = { - if (!done) - if (iterator.isEmpty) { - cancel() - subscriber.onComplete() // FIXME this is technically incorrect since if onComplete throws an Exception, we'll call onError (illegal) - } else if (pendingDemand > 0) { - pendingDemand -= 1 - subscriber.onNext(iterator.next()) - pushNext() - } - } + if (done) () // According to Reactive Streams Spec 3.6, `request` on a cancelled `Subscription` must be a NoOp + else if (elements < 1) { // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError + cancel() + rejectDueToNonPositiveDemand(subscriber) + } else { + pendingDemand += elements + if (pendingDemand < 1) { // According to Reactive Streams Spec 3:17, if we overflow 2^63-1, we need to yield onError + cancel() + rejectDueToOverflow(subscriber) + } else if (!pushing) { + // According to Reactive Streams Spec 3:3, we must prevent unbounded recursion + try { + pushing = true + pendingDemand = elements - if (pushing) - pendingDemand += elements // reentrant call to requestMore from onNext // FIXME This severely lacks overflow checks - else { - try { - pushing = true - pendingDemand = elements - pushNext() - } catch { - case NonFatal(e) ⇒ - cancel() - subscriber.onError(e) - } finally { pushing = false } + @tailrec def pushNext(): Unit = + if (done) () + else if (iterator.isEmpty) { + cancel() + tryOnComplete(subscriber) + } else if (pendingDemand > 0) { + pendingDemand -= 1 + tryOnNext(subscriber, iterator.next()) + pushNext() + } + + pushNext() + } catch { + case sv: SpecViolation ⇒ + cancel() + throw sv.violation // I think it is prudent to "escalate" the spec violation + case NonFatal(e) ⇒ + cancel() + tryOnError(subscriber, e) + } finally { + pushing = false + } + } } } } @@ -96,7 +113,8 @@ private[akka] final class SynchronousIterablePublisher[T]( import SynchronousIterablePublisher.IteratorSubscription - override def subscribe(subscriber: Subscriber[_ >: T]): Unit = IteratorSubscription(subscriber, iterable.iterator) //FIXME what if .iterator throws? + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = + IteratorSubscription(subscriber, try iterable.iterator catch { case NonFatal(t) ⇒ Iterator.continually(throw t) }) override def toString: String = name } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index a6f21015d2..48ebdfeba5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -29,9 +29,7 @@ private[akka] object TickPublisher { class TickPublisherSubscription(ref: ActorRef) extends Subscription { import akka.stream.impl.TickPublisher.TickPublisherSubscription._ def cancel(): Unit = ref ! Cancel - def request(elements: Long): Unit = - if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) - else ref ! RequestMore(elements) + def request(elements: Long): Unit = ref ! RequestMore(elements) override def toString = "TickPublisherSubscription" } @@ -49,6 +47,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite settings: MaterializerSettings, cancelled: AtomicBoolean) extends Actor with SoftShutdown { import akka.stream.impl.TickPublisher.TickPublisherSubscription._ import akka.stream.impl.TickPublisher._ + import ReactiveStreamsCompliance._ var exposedPublisher: ActorPublisher[Any] = _ private var subscriber: Subscriber[_ >: Any] = null @@ -73,31 +72,36 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite context.become(active) } + def handleError(error: Throwable): Unit = { + try { + if (!error.isInstanceOf[SpecViolation]) + tryOnError(subscriber, error) + } finally { + subscriber = null + exposedPublisher.shutdown(Some(error)) // FIXME should this not be SupportsOnlyASingleSubscriber? + context.stop(self) + } + } + def active: Receive = { case Tick ⇒ try { - val tickElement = tick() + val tickElement = tick() // FIXME should we call this even if we shouldn't send it? if (demand > 0) { demand -= 1 - subscriber.onNext(tickElement) + tryOnNext(subscriber, tickElement) } } catch { - case NonFatal(e) ⇒ - if (subscriber ne null) { - subscriber.onError(e) - subscriber = null - } - exposedPublisher.shutdown(Some(e)) - context.stop(self) + case NonFatal(e) ⇒ handleError(e) } case RequestMore(elements) ⇒ - demand += elements - if (demand < 0) { - // Long has overflown, reactive-streams specification rule 3.17 - exposedPublisher.shutdown(Some( - new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue))) - context.stop(self) + if (elements < 1) { + handleError(numberOfElementsInRequestMustBePositiveException) + } else { + demand += elements + if (demand < 0) // Long has overflown, reactive-streams specification rule 3.17 + handleError(totalPendingDemandMustNotExceedLongMaxValueException) } case Cancel ⇒ @@ -106,25 +110,23 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite case SubscribePending ⇒ exposedPublisher.takePendingSubscribers() foreach registerSubscriber - } - def registerSubscriber(s: Subscriber[_ >: Any]): Unit = { - if (subscriber ne null) s.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) - else { + def registerSubscriber(s: Subscriber[_ >: Any]): Unit = subscriber match { + case null ⇒ val subscription = new TickPublisherSubscription(self) subscriber = s - subscriber.onSubscribe(subscription) - } + tryOnSubscribe(s, subscription) + case _ ⇒ + rejectAdditionalSubscriber(s, exposedPublisher) } override def postStop(): Unit = { tickTask.foreach(_.cancel) cancelled.set(true) - if (subscriber ne null) subscriber.onComplete() + if (subscriber ne null) tryOnComplete(subscriber) if (exposedPublisher ne null) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) } - } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 1641d49a52..d28dd1cf2d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -220,17 +220,19 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundarySta case SubscribePending ⇒ subscribePending(exposedPublisher.takePendingSubscribers()) case RequestMore(subscription, elements) ⇒ - - // TODO centralize overflow protection - downstreamDemand += elements - if (downstreamDemand < 0) { - // Long has overflown - val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue) + if (elements < 1) { enter().finish() - fail(demandOverflowException) - } else if (upstreamWaiting) { - upstreamWaiting = false - enter().pull() + fail(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) + } else { + downstreamDemand += elements + // Long has overflown + if (downstreamDemand < 0) { + enter().finish() + fail(ReactiveStreamsCompliance.totalPendingDemandMustNotExceedLongMaxValueException) + } else if (upstreamWaiting) { + upstreamWaiting = false + enter().pull() + } } case Cancel(subscription) ⇒