=str - 16467 - Updates Akka Streams to Reactive Streams 1.0.0.M3

Only failing test is for the SynchronousIterablePublisher since the TCK doesn't handle sync
publishers well (@ktoso is working to fix this for the 1.0.0.RC1)

Adds a lot of FIXMEs to make sure we fix them before we ship the final version of Akka Streams.
This commit is contained in:
Viktor Klang 2014-12-08 23:10:04 +01:00
parent bd3ee6b54f
commit bea8a46dee
16 changed files with 281 additions and 188 deletions

View file

@ -37,9 +37,6 @@ abstract class AkkaPublisherVerification[T](val system: ActorSystem, env: TestEn
override def skipStochasticTests() = true // TODO maybe enable? override def skipStochasticTests() = true // TODO maybe enable?
// TODO re-enable this test once 1.0.0.RC1 is released, with https://github.com/reactive-streams/reactive-streams/pull/154
override def spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() = notVerified("TODO Enable this test once https://github.com/reactive-streams/reactive-streams/pull/154 is merged (ETA 1.0.0.RC1)")
@AfterClass @AfterClass
def shutdownActorSystem(): Unit = { def shutdownActorSystem(): Unit = {
system.shutdown() system.shutdown()

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams._
import scala.concurrent.Promise
class FuturePublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val p = Promise[Int]()
val pub = Source(p.future).runWith(Sink.publisher)
p.success(0)
pub
}
override def maxElementsFromPublisher(): Long = 1
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.impl.SynchronousIterablePublisher
import scala.collection.immutable
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams._
class SyncIterablePublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == Long.MaxValue)
new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else
0 until elements.toInt
Source(SynchronousIterablePublisher(iterable, "synchronous-iterable-publisher")).runWith(Sink.publisher)
}
override def spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() = notVerified("RS TCK 1.0.0.M3 does not handle sync publishers well")
}

View file

@ -118,7 +118,7 @@ trait ActorPublisher[T] extends Actor {
import akka.stream.actor.ActorPublisherMessage._ import akka.stream.actor.ActorPublisherMessage._
import ActorPublisher.Internal._ import ActorPublisher.Internal._
import ActorPublisherMessage._ import ActorPublisherMessage._
import ReactiveStreamsCompliance._
private val state = ActorPublisherState(context.system) private val state = ActorPublisherState(context.system)
private var subscriber: Subscriber[Any] = _ private var subscriber: Subscriber[Any] = _
private var demand = 0L private var demand = 0L
@ -180,7 +180,7 @@ trait ActorPublisher[T] extends Actor {
case Active | PreSubscriber case Active | PreSubscriber
if (demand > 0) { if (demand > 0) {
demand -= 1 demand -= 1
subscriber.onNext(element) tryOnNext(subscriber, element)
} else } else
throw new IllegalStateException( throw new IllegalStateException(
"onNext is not allowed when the stream has not requested elements, totalDemand was 0") "onNext is not allowed when the stream has not requested elements, totalDemand was 0")
@ -199,8 +199,8 @@ trait ActorPublisher[T] extends Actor {
case Active | PreSubscriber case Active | PreSubscriber
lifecycleState = Completed lifecycleState = Completed
if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives
subscriber.onComplete() tryOnComplete(subscriber)
subscriber = null // not used after onError subscriber = null // not used after onComplete
case Completed case Completed
throw new IllegalStateException("onComplete must only be called once") throw new IllegalStateException("onComplete must only be called once")
case _: ErrorEmitted case _: ErrorEmitted
@ -216,7 +216,7 @@ trait ActorPublisher[T] extends Actor {
case Active | PreSubscriber case Active | PreSubscriber
lifecycleState = ErrorEmitted(cause) lifecycleState = ErrorEmitted(cause)
if (subscriber ne null) // otherwise onError will be called when the subscription arrives if (subscriber ne null) // otherwise onError will be called when the subscription arrives
subscriber.onError(cause) tryOnError(subscriber, cause)
subscriber = null // not used after onError subscriber = null // not used after onError
case _: ErrorEmitted case _: ErrorEmitted
throw new IllegalStateException("onError must only be called once") throw new IllegalStateException("onError must only be called once")
@ -230,13 +230,18 @@ trait ActorPublisher[T] extends Actor {
*/ */
protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match { protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match {
case Request(n) case Request(n)
demand += n if (n < 1) {
if (demand < 0 && lifecycleState == Active) { if (lifecycleState == Active)
// Long has overflown onError(numberOfElementsInRequestMustBePositiveException)
val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue) else
onError(demandOverflowException) super.aroundReceive(receive, msg)
} else } else {
super.aroundReceive(receive, msg) demand += n
if (demand < 0 && lifecycleState == Active) // Long has overflown
onError(totalPendingDemandMustNotExceedLongMaxValueException)
else
super.aroundReceive(receive, msg)
}
case Subscribe(sub: Subscriber[_]) case Subscribe(sub: Subscriber[_])
lifecycleState match { lifecycleState match {
@ -245,13 +250,12 @@ trait ActorPublisher[T] extends Actor {
subscriber = sub subscriber = sub
lifecycleState = Active lifecycleState = Active
sub.onSubscribe(new ActorPublisherSubscription(self)) sub.onSubscribe(new ActorPublisherSubscription(self))
case ErrorEmitted(cause) sub.onError(cause) case ErrorEmitted(cause) tryOnError(sub, cause)
case Completed sub.onComplete() case Completed tryOnComplete(sub)
case Active | Canceled case Active | Canceled
if (subscriber == sub) tryOnError(sub,
sub.onError(new IllegalStateException(s"ActorPublisher [$self, sub: $sub] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) if (subscriber eq sub) ReactiveStreamsCompliance.canNotSubscribeTheSameSubscriberMultipleTimesException
else else ReactiveStreamsCompliance.canNotSubscribeTheSameSubscriberMultipleTimesException)
sub.onError(new IllegalStateException(s"ActorPublisher [$self] ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}"))
} }
case Cancel case Cancel
@ -317,7 +321,7 @@ trait ActorPublisher[T] extends Actor {
*/ */
protected[akka] override def aroundPostStop(): Unit = { protected[akka] override def aroundPostStop(): Unit = {
state.remove(self) state.remove(self)
if (lifecycleState == Active) subscriber.onComplete() if (lifecycleState == Active) tryOnComplete(subscriber)
super.aroundPostStop() super.aroundPostStop()
} }
@ -341,10 +345,7 @@ private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscri
import ActorPublisher._ import ActorPublisher._
import ActorPublisherMessage._ import ActorPublisherMessage._
override def request(n: Long): Unit = { override def request(n: Long): Unit = ref ! Request(n)
ReactiveStreamsCompliance.validateRequest(n)
ref ! Request(n)
}
override def cancel(): Unit = ref ! Cancel override def cancel(): Unit = ref ! Cancel
} }

View file

@ -129,7 +129,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
} }
protected def completed: Actor.Receive = { protected def completed: Actor.Receive = {
case OnSubscribe(subscription) throw new IllegalStateException("Cannot subscribe shutdown subscriber") case OnSubscribe(subscription) throw new IllegalStateException("Cannot subscribe shutdown subscriber") // FIXME "shutdown subscriber"?!
} }
protected def inputOnError(e: Throwable): Unit = { protected def inputOnError(e: Throwable): Unit = {
@ -177,9 +177,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
def isClosed: Boolean = downstreamCompleted def isClosed: Boolean = downstreamCompleted
protected def createSubscription(): Subscription = { protected def createSubscription(): Subscription = new ActorSubscription(actor, subscriber)
new ActorSubscription(actor, subscriber)
}
private def subscribePending(subscribers: Seq[Subscriber[Any]]): Unit = private def subscribePending(subscribers: Seq[Subscriber[Any]]): Unit =
subscribers foreach { sub subscribers foreach { sub
@ -201,15 +199,16 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
case SubscribePending case SubscribePending
subscribePending(exposedPublisher.takePendingSubscribers()) subscribePending(exposedPublisher.takePendingSubscribers())
case RequestMore(subscription, elements) case RequestMore(subscription, elements)
if (elements < 1) {
cancel(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
} else {
downstreamDemand += elements
if (downstreamDemand < 1) { // Long has overflown
cancel(ReactiveStreamsCompliance.totalPendingDemandMustNotExceedLongMaxValueException)
}
downstreamDemand += elements pump.pump() // FIXME should this be called even on overflow, sounds like a bug to me
if (downstreamDemand < 0) {
// Long has overflown
val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue)
cancel(demandOverflowException)
} }
pump.pump()
case Cancel(subscription) case Cancel(subscription)
downstreamCompleted = true downstreamCompleted = true
exposedPublisher.shutdown(Some(new ActorPublisher.NormalShutdownException)) exposedPublisher.shutdown(Some(new ActorPublisher.NormalShutdownException))

View file

@ -89,9 +89,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
* INTERNAL API * INTERNAL API
*/ */
private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends Subscription { private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends Subscription {
override def request(elements: Long): Unit = override def request(elements: Long): Unit = impl ! RequestMore(this, elements)
if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg)
else impl ! RequestMore(this, elements)
override def cancel(): Unit = impl ! Cancel(this) override def cancel(): Unit = impl ! Cancel(this)
} }

View file

@ -18,7 +18,8 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] {
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] { private[akka] case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] {
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = subscriber.onError(t) override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
ReactiveStreamsCompliance.tryOnError(subscriber, t) // FIXME how to deal with spec violations here?
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
override def toString: String = name override def toString: String = name
} }

View file

@ -28,15 +28,13 @@ private[akka] object FuturePublisher {
object FutureSubscription { object FutureSubscription {
case class Cancel(subscription: FutureSubscription) case class Cancel(subscription: FutureSubscription)
case class RequestMore(subscription: FutureSubscription) case class RequestMore(subscription: FutureSubscription, elements: Long)
} }
class FutureSubscription(ref: ActorRef) extends Subscription { class FutureSubscription(ref: ActorRef) extends Subscription {
import akka.stream.impl.FuturePublisher.FutureSubscription._ import akka.stream.impl.FuturePublisher.FutureSubscription._
def cancel(): Unit = ref ! Cancel(this) def cancel(): Unit = ref ! Cancel(this)
def request(elements: Long): Unit = def request(elements: Long): Unit = ref ! RequestMore(this, elements)
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg)
else ref ! RequestMore(this)
override def toString = "FutureSubscription" override def toString = "FutureSubscription"
} }
} }
@ -44,11 +42,12 @@ private[akka] object FuturePublisher {
/** /**
* INTERNAL API * INTERNAL API
*/ */
//FIXME why do we need to have an actor to drive a Future? // FIXME why do we need to have an actor to drive a Future?
private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerSettings) extends Actor with SoftShutdown { private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerSettings) extends Actor with SoftShutdown {
import akka.stream.impl.FuturePublisher.FutureSubscription import akka.stream.impl.FuturePublisher.FutureSubscription
import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel
import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore
import ReactiveStreamsCompliance._
var exposedPublisher: ActorPublisher[Any] = _ var exposedPublisher: ActorPublisher[Any] = _
var subscribers = Map.empty[Subscriber[Any], FutureSubscription] var subscribers = Map.empty[Subscriber[Any], FutureSubscription]
@ -77,42 +76,53 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS
def active: Receive = { def active: Receive = {
case SubscribePending case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber exposedPublisher.takePendingSubscribers() foreach registerSubscriber
case RequestMore(subscription) case RequestMore(subscription, elements) // FIXME we aren't tracking demand per subscription so we don't check for overflow. We should.
if (subscriptions.contains(subscription)) { if (subscriptions.contains(subscription)) {
subscriptionsReadyForPush += subscription if (elements < 1) {
push(subscriptions(subscription)) val subscriber = subscriptions(subscription)
rejectDueToNonPositiveDemand(subscriber)
removeSubscriber(subscriber)
} else {
subscriptionsReadyForPush += subscription
push(subscriptions(subscription))
}
} }
case Cancel(subscription) if subscriptions.contains(subscription) case Cancel(subscription) if subscriptions.contains(subscription)
removeSubscriber(subscriptions(subscription)) removeSubscriber(subscriptions(subscription))
case Status.Failure(ex) case Status.Failure(ex)
futureValue = Some(Failure(ex)) if (futureValue.isEmpty) {
pushToAll() futureValue = Some(Failure(ex))
pushToAll()
}
case value case value
futureValue = Some(Success(value)) if (futureValue.isEmpty) {
pushToAll() futureValue = Some(Success(value))
pushToAll()
}
} }
def pushToAll(): Unit = subscriptionsReadyForPush foreach { subscription push(subscriptions(subscription)) } def pushToAll(): Unit = subscriptionsReadyForPush foreach { subscription push(subscriptions(subscription)) }
def push(subscriber: Subscriber[Any]): Unit = futureValue match { def push(subscriber: Subscriber[Any]): Unit = futureValue match {
case Some(Success(value)) case Some(Success(value))
subscriber.onNext(value)
subscriber.onComplete() tryOnNext(subscriber, value)
tryOnComplete(subscriber)
removeSubscriber(subscriber) removeSubscriber(subscriber)
case Some(Failure(t)) case Some(Failure(t))
subscriber.onError(t) tryOnError(subscriber, t)
removeSubscriber(subscriber) removeSubscriber(subscriber)
case None // not completed yet case None // not completed yet
} }
def registerSubscriber(subscriber: Subscriber[Any]): Unit = { def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
if (subscribers.contains(subscriber)) if (subscribers.contains(subscriber)) // FIXME this is not legal AFAICT, needs to check identity, not equality
subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) rejectDuplicateSubscriber(subscriber)
else { else {
val subscription = new FutureSubscription(self) val subscription = new FutureSubscription(self)
subscribers = subscribers.updated(subscriber, subscription) subscribers = subscribers.updated(subscriber, subscription)
subscriptions = subscriptions.updated(subscription, subscriber) subscriptions = subscriptions.updated(subscription, subscriber)
subscriber.onSubscribe(subscription) tryOnSubscribe(subscriber, subscription)
} }
} }
@ -127,7 +137,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS
} }
} }
override def postStop(): Unit = override def postStop(): Unit = // FIXME if something blows up, are the subscribers onErrored?
if (exposedPublisher ne null) if (exposedPublisher ne null)
exposedPublisher.shutdown(shutdownReason) exposedPublisher.shutdown(shutdownReason)

View file

@ -66,11 +66,15 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: Materia
def active: Receive = { def active: Receive = {
case RequestMore(_, elements) case RequestMore(_, elements)
downstreamDemand += elements if (elements < 1)
if (downstreamDemand < 0) // Long has overflown, reactive-streams specification rule 3.17 stop(Errored(numberOfElementsInRequestMustBePositiveException))
stop(Errored(new IllegalStateException(TotalPendingDemandMustNotExceedLongMaxValue))) else {
else downstreamDemand += elements
push() if (downstreamDemand < 0) // Long has overflown, reactive-streams specification rule 3.17
stop(Errored(totalPendingDemandMustNotExceedLongMaxValueException))
else
push()
}
case PushMore case PushMore
push() push()
case _: Cancel case _: Cancel

View file

@ -17,7 +17,7 @@ private[akka] case class RequestMore(subscription: ActorSubscription[_], demand:
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class Cancel(subscriptions: ActorSubscription[_]) private[akka] case class Cancel(subscription: ActorSubscription[_])
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -20,11 +20,26 @@ private[stream] object ReactiveStreamsCompliance {
final val TotalPendingDemandMustNotExceedLongMaxValue = final val TotalPendingDemandMustNotExceedLongMaxValue =
"Total pending demand MUST NOT be > `java.lang.Long.MAX_VALUE` (see reactive-streams specification, rule 3.17)" "Total pending demand MUST NOT be > `java.lang.Long.MAX_VALUE` (see reactive-streams specification, rule 3.17)"
final def validateRequest(n: Long): Unit = final def totalPendingDemandMustNotExceedLongMaxValueException: Throwable =
if (n < 1) throw new IllegalArgumentException(NumberOfElementsInRequestMustBePositiveMsg) with SpecViolation new IllegalStateException(TotalPendingDemandMustNotExceedLongMaxValue)
final def rejectAdditionalSubscriber[T](subsriber: Subscriber[T], rejector: Publisher[T]): Unit = final def numberOfElementsInRequestMustBePositiveException: Throwable =
tryOnError(subsriber, new IllegalStateException(s"$rejector $SupportsOnlyASingleSubscriber")) new IllegalArgumentException(NumberOfElementsInRequestMustBePositiveMsg)
final def canNotSubscribeTheSameSubscriberMultipleTimesException: Throwable =
new IllegalStateException(CanNotSubscribeTheSameSubscriberMultipleTimes)
final def rejectDuplicateSubscriber[T](subscriber: Subscriber[T]): Unit =
tryOnError(subscriber, canNotSubscribeTheSameSubscriberMultipleTimesException)
final def rejectAdditionalSubscriber[T](subscriber: Subscriber[T], rejector: Publisher[T]): Unit =
tryOnError(subscriber, new IllegalStateException(s"$rejector $SupportsOnlyASingleSubscriber"))
final def rejectDueToOverflow[T](subscriber: Subscriber[T]): Unit =
tryOnError(subscriber, totalPendingDemandMustNotExceedLongMaxValueException)
final def rejectDueToNonPositiveDemand[T](subscriber: Subscriber[T]): Unit =
tryOnError(subscriber, numberOfElementsInRequestMustBePositiveException)
sealed trait SpecViolation { sealed trait SpecViolation {
self: Throwable self: Throwable

View file

@ -24,9 +24,7 @@ private[akka] object MultiStreamOutputProcessor {
case class SubstreamSubscriptionTimeout(substream: SubstreamKey) case class SubstreamSubscriptionTimeout(substream: SubstreamKey)
class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription {
override def request(elements: Long): Unit = override def request(elements: Long): Unit = parent ! SubstreamRequestMore(substreamKey, elements)
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg)
else parent ! SubstreamRequestMore(substreamKey, elements)
override def cancel(): Unit = parent ! SubstreamCancel(substreamKey) override def cancel(): Unit = parent ! SubstreamCancel(substreamKey)
override def toString = "SubstreamSubscription" + System.identityHashCode(this) override def toString = "SubstreamSubscription" + System.identityHashCode(this)
} }

View file

@ -41,17 +41,6 @@ private[akka] trait SubscriptionWithCursor[T] extends Subscription with Resizabl
def dispatch(element: T): Unit = subscriber.onNext(element) def dispatch(element: T): Unit = subscriber.onNext(element)
/** Increases the `requested` counter, additionally providing overflow protection */
def moreRequested(demand: Long): Long = {
val sum = totalDemand + demand
val noOverflow = sum > 0
if (noOverflow) totalDemand = sum
else subscriber.onError(new IllegalStateException(s"Total pending demand ($totalDemand + $demand) would overflow `Long`, for Subscriber $subscriber! ${ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue}"))
sum
}
var active = true var active = true
/** Do not increment directly, use `moreRequested(Long)` instead (it provides overflow protection)! */ /** Do not increment directly, use `moreRequested(Long)` instead (it provides overflow protection)! */
@ -112,29 +101,43 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
*/ */
protected def moreRequested(subscription: S, elements: Long): Unit = protected def moreRequested(subscription: S, elements: Long): Unit =
if (subscription.active) { if (subscription.active) {
// returns Long.MinValue if the subscription is to be terminated import ReactiveStreamsCompliance._
@tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long = // check for illegal demand See 3.9
if (requested == 0) { if (elements < 1) {
// if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore` try tryOnError(subscription.subscriber, numberOfElementsInRequestMustBePositiveException)
if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0 finally unregisterSubscriptionInternal(subscription)
} else if (buffer.count(subscription) > 0) { } else {
subscription.dispatch(buffer.read(subscription)) endOfStream match {
dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) case eos @ (NotReached | Completed)
} else if (eos ne NotReached) Long.MinValue val demand = subscription.totalDemand + elements
else requested //Check for overflow
if (demand < 1) {
try tryOnError(subscription.subscriber, totalPendingDemandMustNotExceedLongMaxValueException)
finally unregisterSubscriptionInternal(subscription)
} else {
subscription.totalDemand = demand
// returns Long.MinValue if the subscription is to be terminated
@tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long =
if (requested == 0) {
// if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore`
if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0
} else if (buffer.count(subscription) > 0) {
subscription.dispatch(buffer.read(subscription)) // FIXME this does not gracefully handle the case if onNext throws
dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos)
} else if (eos ne NotReached) Long.MinValue
else requested
endOfStream match { dispatchFromBufferAndReturnRemainingRequested(demand, eos) match {
case eos @ (NotReached | Completed) case Long.MinValue
val demand = subscription.moreRequested(elements) eos(subscription.subscriber)
dispatchFromBufferAndReturnRemainingRequested(demand, eos) match { unregisterSubscriptionInternal(subscription)
case Long.MinValue case x
eos(subscription.subscriber) subscription.totalDemand = x
unregisterSubscriptionInternal(subscription) requestFromUpstreamIfRequired()
case x }
subscription.totalDemand = x }
requestFromUpstreamIfRequired() case ErrorCompleted(_) // ignore, the Subscriber might not have seen our error event yet
} }
case ErrorCompleted(_) // ignore, the Subscriber might not have seen our error event yet
} }
} }
@ -212,20 +215,17 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
* Register a new subscriber. * Register a new subscriber.
*/ */
protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match { protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match {
case NotReached if subscriptions.exists(_.subscriber eq subscriber) case NotReached if subscriptions.exists(_.subscriber eq subscriber) ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber)
subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [${this}, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) case NotReached addSubscription(subscriber)
case NotReached case Completed if buffer.nonEmpty addSubscription(subscriber)
val newSubscription = createSubscription(subscriber) case eos eos(subscriber)
subscriptions ::= newSubscription }
buffer.initCursor(newSubscription)
subscriber.onSubscribe(newSubscription) protected def addSubscription(subscriber: Subscriber[_ >: T]): Unit = {
case Completed if buffer.nonEmpty val newSubscription = createSubscription(subscriber)
val newSubscription = createSubscription(subscriber) subscriptions ::= newSubscription
subscriptions ::= newSubscription buffer.initCursor(newSubscription)
buffer.initCursor(newSubscription) ReactiveStreamsCompliance.tryOnSubscribe(subscriber, newSubscription) // FIXME what if this throws?
subscriber.onSubscribe(newSubscription)
case eos
eos(subscriber)
} }
/** /**

View file

@ -28,49 +28,66 @@ private[akka] object SynchronousIterablePublisher {
var pendingDemand = 0L var pendingDemand = 0L
var pushing = false var pushing = false
import ReactiveStreamsCompliance._
def init(): Unit = try { def init(): Unit = try {
if (!iterator.hasNext) { if (!iterator.hasNext) { // Let's be prudent and issue onComplete immediately
cancel() cancel()
subscriber.onSubscribe(this) tryOnSubscribe(subscriber, this)
subscriber.onComplete() tryOnComplete(subscriber)
} else { } else {
subscriber.onSubscribe(this) tryOnSubscribe(subscriber, this)
} }
} catch { } catch {
case sv: SpecViolation
cancel()
throw sv.violation // I think it is prudent to "escalate" the spec violation
case NonFatal(e) case NonFatal(e)
cancel() cancel()
subscriber.onError(e) tryOnError(subscriber, e)
} }
override def cancel(): Unit = override def cancel(): Unit = done = true
done = true
override def request(elements: Long): Unit = { override def request(elements: Long): Unit = {
if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) if (done) () // According to Reactive Streams Spec 3.6, `request` on a cancelled `Subscription` must be a NoOp
@tailrec def pushNext(): Unit = { else if (elements < 1) { // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError
if (!done) cancel()
if (iterator.isEmpty) { rejectDueToNonPositiveDemand(subscriber)
cancel() } else {
subscriber.onComplete() // FIXME this is technically incorrect since if onComplete throws an Exception, we'll call onError (illegal) pendingDemand += elements
} else if (pendingDemand > 0) { if (pendingDemand < 1) { // According to Reactive Streams Spec 3:17, if we overflow 2^63-1, we need to yield onError
pendingDemand -= 1 cancel()
subscriber.onNext(iterator.next()) rejectDueToOverflow(subscriber)
pushNext() } else if (!pushing) {
} // According to Reactive Streams Spec 3:3, we must prevent unbounded recursion
} try {
pushing = true
pendingDemand = elements
if (pushing) @tailrec def pushNext(): Unit =
pendingDemand += elements // reentrant call to requestMore from onNext // FIXME This severely lacks overflow checks if (done) ()
else { else if (iterator.isEmpty) {
try { cancel()
pushing = true tryOnComplete(subscriber)
pendingDemand = elements } else if (pendingDemand > 0) {
pushNext() pendingDemand -= 1
} catch { tryOnNext(subscriber, iterator.next())
case NonFatal(e) pushNext()
cancel() }
subscriber.onError(e)
} finally { pushing = false } pushNext()
} catch {
case sv: SpecViolation
cancel()
throw sv.violation // I think it is prudent to "escalate" the spec violation
case NonFatal(e)
cancel()
tryOnError(subscriber, e)
} finally {
pushing = false
}
}
} }
} }
} }
@ -96,7 +113,8 @@ private[akka] final class SynchronousIterablePublisher[T](
import SynchronousIterablePublisher.IteratorSubscription import SynchronousIterablePublisher.IteratorSubscription
override def subscribe(subscriber: Subscriber[_ >: T]): Unit = IteratorSubscription(subscriber, iterable.iterator) //FIXME what if .iterator throws? override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
IteratorSubscription(subscriber, try iterable.iterator catch { case NonFatal(t) Iterator.continually(throw t) })
override def toString: String = name override def toString: String = name
} }

View file

@ -29,9 +29,7 @@ private[akka] object TickPublisher {
class TickPublisherSubscription(ref: ActorRef) extends Subscription { class TickPublisherSubscription(ref: ActorRef) extends Subscription {
import akka.stream.impl.TickPublisher.TickPublisherSubscription._ import akka.stream.impl.TickPublisher.TickPublisherSubscription._
def cancel(): Unit = ref ! Cancel def cancel(): Unit = ref ! Cancel
def request(elements: Long): Unit = def request(elements: Long): Unit = ref ! RequestMore(elements)
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg)
else ref ! RequestMore(elements)
override def toString = "TickPublisherSubscription" override def toString = "TickPublisherSubscription"
} }
@ -49,6 +47,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
settings: MaterializerSettings, cancelled: AtomicBoolean) extends Actor with SoftShutdown { settings: MaterializerSettings, cancelled: AtomicBoolean) extends Actor with SoftShutdown {
import akka.stream.impl.TickPublisher.TickPublisherSubscription._ import akka.stream.impl.TickPublisher.TickPublisherSubscription._
import akka.stream.impl.TickPublisher._ import akka.stream.impl.TickPublisher._
import ReactiveStreamsCompliance._
var exposedPublisher: ActorPublisher[Any] = _ var exposedPublisher: ActorPublisher[Any] = _
private var subscriber: Subscriber[_ >: Any] = null private var subscriber: Subscriber[_ >: Any] = null
@ -73,31 +72,36 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
context.become(active) context.become(active)
} }
def handleError(error: Throwable): Unit = {
try {
if (!error.isInstanceOf[SpecViolation])
tryOnError(subscriber, error)
} finally {
subscriber = null
exposedPublisher.shutdown(Some(error)) // FIXME should this not be SupportsOnlyASingleSubscriber?
context.stop(self)
}
}
def active: Receive = { def active: Receive = {
case Tick case Tick
try { try {
val tickElement = tick() val tickElement = tick() // FIXME should we call this even if we shouldn't send it?
if (demand > 0) { if (demand > 0) {
demand -= 1 demand -= 1
subscriber.onNext(tickElement) tryOnNext(subscriber, tickElement)
} }
} catch { } catch {
case NonFatal(e) case NonFatal(e) handleError(e)
if (subscriber ne null) {
subscriber.onError(e)
subscriber = null
}
exposedPublisher.shutdown(Some(e))
context.stop(self)
} }
case RequestMore(elements) case RequestMore(elements)
demand += elements if (elements < 1) {
if (demand < 0) { handleError(numberOfElementsInRequestMustBePositiveException)
// Long has overflown, reactive-streams specification rule 3.17 } else {
exposedPublisher.shutdown(Some( demand += elements
new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue))) if (demand < 0) // Long has overflown, reactive-streams specification rule 3.17
context.stop(self) handleError(totalPendingDemandMustNotExceedLongMaxValueException)
} }
case Cancel case Cancel
@ -106,25 +110,23 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
case SubscribePending case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber exposedPublisher.takePendingSubscribers() foreach registerSubscriber
} }
def registerSubscriber(s: Subscriber[_ >: Any]): Unit = { def registerSubscriber(s: Subscriber[_ >: Any]): Unit = subscriber match {
if (subscriber ne null) s.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) case null
else {
val subscription = new TickPublisherSubscription(self) val subscription = new TickPublisherSubscription(self)
subscriber = s subscriber = s
subscriber.onSubscribe(subscription) tryOnSubscribe(s, subscription)
} case _
rejectAdditionalSubscriber(s, exposedPublisher)
} }
override def postStop(): Unit = { override def postStop(): Unit = {
tickTask.foreach(_.cancel) tickTask.foreach(_.cancel)
cancelled.set(true) cancelled.set(true)
if (subscriber ne null) subscriber.onComplete() if (subscriber ne null) tryOnComplete(subscriber)
if (exposedPublisher ne null) if (exposedPublisher ne null)
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
} }
} }

View file

@ -220,17 +220,19 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundarySta
case SubscribePending case SubscribePending
subscribePending(exposedPublisher.takePendingSubscribers()) subscribePending(exposedPublisher.takePendingSubscribers())
case RequestMore(subscription, elements) case RequestMore(subscription, elements)
if (elements < 1) {
// TODO centralize overflow protection
downstreamDemand += elements
if (downstreamDemand < 0) {
// Long has overflown
val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue)
enter().finish() enter().finish()
fail(demandOverflowException) fail(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
} else if (upstreamWaiting) { } else {
upstreamWaiting = false downstreamDemand += elements
enter().pull() // Long has overflown
if (downstreamDemand < 0) {
enter().finish()
fail(ReactiveStreamsCompliance.totalPendingDemandMustNotExceedLongMaxValueException)
} else if (upstreamWaiting) {
upstreamWaiting = false
enter().pull()
}
} }
case Cancel(subscription) case Cancel(subscription)