Merge pull request #16770 from akka/wip-16331-onError-patriknw

=str #16331 Use ReactiveStreamsCompliance utility everywhere
This commit is contained in:
Patrik Nordwall 2015-02-03 14:32:16 +01:00
commit 2eda4ee1a5
15 changed files with 113 additions and 73 deletions

View file

@ -199,8 +199,9 @@ trait ActorPublisher[T] extends Actor {
case Active | PreSubscriber
lifecycleState = Completed
if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives
tryOnComplete(subscriber)
subscriber = null // not used after onComplete
try tryOnComplete(subscriber) finally {
subscriber = null // not used after onComplete
}
case Completed
throw new IllegalStateException("onComplete must only be called once")
case _: ErrorEmitted
@ -216,8 +217,8 @@ trait ActorPublisher[T] extends Actor {
case Active | PreSubscriber
lifecycleState = ErrorEmitted(cause)
if (subscriber ne null) // otherwise onError will be called when the subscription arrives
tryOnError(subscriber, cause)
subscriber = null // not used after onError
try tryOnError(subscriber, cause) finally
subscriber = null // not used after onError
case _: ErrorEmitted
throw new IllegalStateException("onError must only be called once")
case Completed
@ -249,7 +250,7 @@ trait ActorPublisher[T] extends Actor {
scheduledSubscriptionTimeout.cancel()
subscriber = sub
lifecycleState = Active
sub.onSubscribe(new ActorPublisherSubscription(self))
tryOnSubscribe(sub, new ActorPublisherSubscription(self))
case ErrorEmitted(cause) tryOnError(sub, cause)
case Completed tryOnComplete(sub)
case Active | Canceled
@ -321,8 +322,8 @@ trait ActorPublisher[T] extends Actor {
*/
protected[akka] override def aroundPostStop(): Unit = {
state.remove(self)
if (lifecycleState == Active) tryOnComplete(subscriber)
super.aroundPostStop()
try if (lifecycleState == Active) tryOnComplete(subscriber)
finally super.aroundPostStop()
}
}

View file

@ -4,12 +4,12 @@
package akka.stream.impl
import java.util.Arrays
import akka.actor._
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError }
import org.reactivestreams.{ Subscriber, Subscription, Processor }
import akka.event.Logging
/**
* INTERNAL API
@ -142,6 +142,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
* INTERNAL API
*/
private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends DefaultOutputTransferStates {
import ReactiveStreamsCompliance._
protected var exposedPublisher: ActorPublisher[Any] = _
@ -156,22 +157,22 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
def enqueueOutputElement(elem: Any): Unit = {
downstreamDemand -= 1
subscriber.onNext(elem)
tryOnNext(subscriber, elem)
}
def complete(): Unit = {
if (!downstreamCompleted) {
downstreamCompleted = true
if (subscriber ne null) subscriber.onComplete()
if (exposedPublisher ne null) exposedPublisher.shutdown(None)
if (subscriber ne null) tryOnComplete(subscriber)
}
}
def cancel(e: Throwable): Unit = {
if (!downstreamCompleted) {
downstreamCompleted = true
if (subscriber ne null) subscriber.onError(e)
if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e)
}
}
@ -183,8 +184,9 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
subscribers foreach { sub
if (subscriber eq null) {
subscriber = sub
subscriber.onSubscribe(createSubscription())
} else sub.onError(new IllegalStateException(s"${getClass.getSimpleName} ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}"))
tryOnSubscribe(subscriber, createSubscription())
} else
tryOnError(sub, new IllegalStateException(s"${Logging.simpleName(this)} ${SupportsOnlyASingleSubscriber}"))
}
protected def waitingExposedPublisher: Actor.Receive = {

View file

@ -16,7 +16,7 @@ import org.reactivestreams.Subscription
* INTERNAL API
*/
private[akka] object ActorPublisher {
class NormalShutdownException extends IllegalStateException("Cannot subscribe to shut-down spi.Publisher") with NoStackTrace
class NormalShutdownException extends IllegalStateException("Cannot subscribe to shut-down Publisher") with NoStackTrace
val NormalShutdownReason: Option[Throwable] = Some(new NormalShutdownException)
def apply[T](impl: ActorRef): ActorPublisher[T] = {
@ -35,6 +35,7 @@ private[akka] object ActorPublisher {
* ActorRef! If you don't need to subclass, prefer the apply() method on the companion object which takes care of this.
*/
private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
import ReactiveStreamsCompliance._
// The subscriber of an subscription attempt is first placed in this list of pending subscribers.
// The actor will call takePendingSubscribers to remove it from the list when it has received the
@ -78,9 +79,16 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
@volatile private var shutdownReason: Option[Throwable] = None
private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit =
shutdownReason match {
case Some(e) subscriber.onError(e)
case None subscriber.onComplete()
try shutdownReason match {
case Some(e: SpecViolation) // ok, not allowed to call onError
case Some(e)
if (shutdownReason eq ActorPublisher.NormalShutdownReason)
(new RuntimeException("BOOM")).printStackTrace()
tryOnError(subscriber, e)
case None tryOnComplete(subscriber)
} catch {
case _: SpecViolation // nothing to do
}
}

View file

@ -9,7 +9,11 @@ import org.reactivestreams.{ Subscriber, Publisher }
* INTERNAL API
*/
private[akka] case object EmptyPublisher extends Publisher[Nothing] {
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = subscriber.onComplete()
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
try tryOnComplete(subscriber) catch {
case _: SpecViolation // nothing to do
}
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
override def toString: String = "empty-publisher" // FIXME is this a good name?
}

View file

@ -55,7 +55,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
override protected def shutdown(completed: Boolean): Unit = {
if (exposedPublisher ne null) {
if (completed) exposedPublisher.shutdown(None)
else exposedPublisher.shutdown(Some(new IllegalStateException("Cannot subscribe to shutdown publisher")))
else exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
}
afterShutdown()
}

View file

@ -17,6 +17,7 @@ import akka.pattern.pipe
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import akka.actor.DeadLetterSuppression
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -42,7 +43,7 @@ private[akka] object FuturePublisher {
* INTERNAL API
*/
// FIXME why do we need to have an actor to drive a Future?
private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMaterializerSettings) extends Actor with SoftShutdown {
private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMaterializerSettings) extends Actor {
import akka.stream.impl.FuturePublisher.FutureSubscription
import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel
import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore
@ -102,17 +103,22 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate
def pushToAll(): Unit = subscriptionsReadyForPush foreach { subscription push(subscriptions(subscription)) }
def push(subscriber: Subscriber[Any]): Unit = futureValue match {
case Some(Success(value))
tryOnNext(subscriber, value)
tryOnComplete(subscriber)
removeSubscriber(subscriber)
case Some(Failure(t))
tryOnError(subscriber, t)
removeSubscriber(subscriber)
case None // not completed yet
}
def push(subscriber: Subscriber[Any]): Unit =
futureValue match {
case Some(someValue) try someValue match {
case Success(value)
tryOnNext(subscriber, value)
tryOnComplete(subscriber)
case Failure(t)
shutdownReason = Some(t)
tryOnError(subscriber, t)
} catch {
case _: SpecViolation // continue
} finally {
removeSubscriber(subscriber)
}
case None // not completed yet
}
def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
if (subscribers.contains(subscriber)) // FIXME this is not legal AFAICT, needs to check identity, not equality
@ -132,7 +138,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate
subscribers -= subscriber
if (subscribers.isEmpty) {
exposedPublisher.shutdown(shutdownReason)
softShutdown()
context.stop(self)
}
}

View file

@ -129,11 +129,12 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: ActorFl
case Unitialized | Initialized | Cancelled
if (exposedPublisher ne null) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
case Completed
tryOnComplete(subscriber)
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
tryOnComplete(subscriber)
case Errored(e)
tryOnError(subscriber, e)
exposedPublisher.shutdown(Some(e))
if (!e.isInstanceOf[SpecViolation])
tryOnError(subscriber, e)
}
// if onComplete or onError throws we let normal supervision take care of it,
// see reactive-streams specification rule 2:13

View file

@ -40,6 +40,7 @@ private[akka] object MultiStreamOutputProcessor {
class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: Cancellable)
extends SimpleOutputs(actor, pump) with Publisher[Any] {
import ReactiveStreamsCompliance._
import SubstreamOutput._
@ -80,8 +81,9 @@ private[akka] object MultiStreamOutputProcessor {
}
private def closeSubscriber(s: Subscriber[Any], withState: CompletedState): Unit = withState match {
case Completed s.onComplete()
case Failed(e) s.onError(e)
case Completed tryOnComplete(s)
case Failed(e: SpecViolation) // nothing to do
case Failed(e) tryOnError(s, e)
}
override def subscribe(s: Subscriber[_ >: Any]): Unit = {
@ -89,9 +91,12 @@ private[akka] object MultiStreamOutputProcessor {
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached s.onError(new IllegalStateException("Substream publisher " + ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber))
case c: CompletedState closeSubscriber(s, c)
case Open throw new IllegalStateException("Publisher cannot become open after being used before")
case _: Attached
tryOnError(s, new IllegalStateException("Substream publisher " + SupportsOnlyASingleSubscriber))
case c: CompletedState
closeSubscriber(s, c)
case Open
throw new IllegalStateException("Publisher cannot become open after being used before")
}
}
}
@ -99,8 +104,9 @@ private[akka] object MultiStreamOutputProcessor {
def attachSubscriber(s: Subscriber[Any]): Unit =
if (subscriber eq null) {
subscriber = s
subscriber.onSubscribe(subscription)
} else subscriber.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher"))
tryOnSubscribe(subscriber, subscription)
} else
tryOnError(subscriber, new IllegalStateException("Substream publisher " + SupportsOnlyASingleSubscriber))
}
}

View file

@ -23,11 +23,13 @@ private[akka] object SubscriberManagement {
}
object Completed extends EndOfStream {
def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onComplete()
import ReactiveStreamsCompliance._
def apply[T](subscriber: Subscriber[T]): Unit = tryOnComplete(subscriber)
}
final case class ErrorCompleted(cause: Throwable) extends EndOfStream {
def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onError(cause)
import ReactiveStreamsCompliance._
def apply[T](subscriber: Subscriber[T]): Unit = tryOnError(subscriber, cause)
}
val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down Publisher"))
@ -37,9 +39,11 @@ private[akka] object SubscriberManagement {
* INTERNAL API
*/
private[akka] trait SubscriptionWithCursor[T] extends Subscription with ResizableMultiReaderRingBuffer.Cursor {
import ReactiveStreamsCompliance._
def subscriber: Subscriber[_ >: T]
def dispatch(element: T): Unit = subscriber.onNext(element)
def dispatch(element: T): Unit = tryOnNext(subscriber, element)
var active = true

View file

@ -43,7 +43,7 @@ private[akka] object TickPublisher {
* otherwise the tick element is dropped.
*/
private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any,
settings: ActorFlowMaterializerSettings, cancelled: AtomicBoolean) extends Actor with SoftShutdown {
settings: ActorFlowMaterializerSettings, cancelled: AtomicBoolean) extends Actor {
import akka.stream.impl.TickPublisher.TickPublisherSubscription._
import akka.stream.impl.TickPublisher._
import ReactiveStreamsCompliance._
@ -122,9 +122,10 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
override def postStop(): Unit = {
tickTask.foreach(_.cancel)
cancelled.set(true)
if (subscriber ne null) tryOnComplete(subscriber)
if (exposedPublisher ne null)
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
if (subscriber ne null)
tryOnComplete(subscriber)
}
}

View file

@ -149,6 +149,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int)
*/
private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boolean, log: LoggingAdapter)
extends BoundaryStage {
import ReactiveStreamsCompliance._
private var exposedPublisher: ActorPublisher[Any] = _
@ -163,14 +164,14 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole
private def onNext(elem: Any): Unit = {
downstreamDemand -= 1
subscriber.onNext(elem)
tryOnNext(subscriber, elem)
}
private def complete(): Unit = {
if (!downstreamCompleted) {
downstreamCompleted = true
if (subscriber ne null) subscriber.onComplete()
if (exposedPublisher ne null) exposedPublisher.shutdown(None)
if (subscriber ne null) tryOnComplete(subscriber)
}
}
@ -179,8 +180,8 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole
downstreamCompleted = true
if (debugLogging)
log.debug("fail due to: {}", e.getMessage)
if (subscriber ne null) subscriber.onError(e)
if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e)
}
}
@ -211,8 +212,9 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole
subscribers foreach { sub
if (subscriber eq null) {
subscriber = sub
subscriber.onSubscribe(new ActorSubscription(actor, subscriber))
} else sub.onError(new IllegalStateException(s"${Logging.simpleName(this)} ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}"))
tryOnSubscribe(subscriber, new ActorSubscription(actor, subscriber))
} else
tryOnError(sub, new IllegalStateException(s"${Logging.simpleName(this)} ${SupportsOnlyASingleSubscriber}"))
}
protected def waitingExposedPublisher: Actor.Receive = {

View file

@ -10,16 +10,18 @@ import org.reactivestreams.Subscriber
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import akka.stream.impl.ReactiveStreamsCompliance
/**
* INTERNAL API
*/
private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[I, O]])(implicit ec: ExecutionContext) extends Processor[I, O] {
import ReactiveStreamsCompliance._
@volatile private var impl: Processor[I, O] = _
private val setVarFuture = implFuture.andThen { case Success(p) impl = p }
override def onSubscribe(s: Subscription): Unit = setVarFuture.onComplete {
case Success(x) x.onSubscribe(s)
case Success(x) tryOnSubscribe(x, s)
case Failure(_) s.cancel()
}

View file

@ -40,6 +40,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
flowSubscriber: Subscriber[StreamTcp.IncomingConnection],
bindCmd: Tcp.Bind, settings: ActorFlowMaterializerSettings) extends Actor
with Pump with ActorLogging {
import ReactiveStreamsCompliance._
import context.system
object primaryOutputs extends SimpleOutputs(self, pump = this) {
@ -89,8 +90,8 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
val ex = BindFailedException
localAddressPromise.failure(ex)
unbindPromise.failure(ex)
flowSubscriber.onError(ex)
fail(ex)
try tryOnError(flowSubscriber, ex)
finally fail(ex)
}
def running: Receive = {

View file

@ -152,6 +152,7 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS
}
final case class LazyEmptySource[Out]() extends KeyedActorFlowSource[Out, Promise[Unit]] {
import ReactiveStreamsCompliance._
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorFlowMaterializer, flowName: String) = {
val created = create(materializer, flowName)
created._1.subscribe(flowSubscriber)
@ -166,14 +167,14 @@ final case class LazyEmptySource[Out]() extends KeyedActorFlowSource[Out, Promis
// so we can enable it then, though it will require external completing of the promise
val pub = new Publisher[Unit] {
override def subscribe(s: Subscriber[_ >: Unit]) = {
s.onSubscribe(new Subscription {
tryOnSubscribe(s, new Subscription {
override def request(n: Long): Unit = ()
override def cancel(): Unit = p.success(())
})
p.future.onComplete {
case Success(_) s.onComplete()
case Failure(ex) s.onError(ex) // due to external signal
case Success(_) tryOnComplete(s)
case Failure(ex) tryOnError(s, ex) // due to external signal
}(materializer.asInstanceOf[ActorFlowMaterializerImpl].executionContext) // TODO: Should it use this EC or something else?
}
}

View file

@ -4,7 +4,6 @@
package akka.stream.scaladsl
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference }
import akka.stream.FlowMaterializer
import akka.stream.impl.Ast
import akka.stream.impl.Ast.FanInAstNode
@ -12,8 +11,8 @@ import akka.stream.impl.{ DirectedGraphBuilder, Edge }
import akka.stream.impl.Ast.Defaults._
import akka.stream.scaladsl.OperationAttributes._
import org.reactivestreams._
import scala.language.existentials
import akka.stream.impl.ReactiveStreamsCompliance
/**
* Fan-in and fan-out vertices in the [[FlowGraph]] implements
@ -528,43 +527,45 @@ private[akka] object FlowGraphInternal {
final class IdentityProcessor extends Processor[Any, Any] {
import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage._
import ReactiveStreamsCompliance._
@volatile private var subscriber: Subscriber[Any] = null
private val state = new AtomicReference[AnyRef]()
override def onSubscribe(s: Subscription) =
if (subscriber != null) subscriber.onSubscribe(s)
if (subscriber != null) tryOnSubscribe(subscriber, s)
else state.getAndSet(OnSubscribe(s)) match {
case sub: Subscriber[Any] sub.onSubscribe(s)
case _
case sub: Subscriber[Any] @unchecked tryOnSubscribe(sub, s)
case _
}
override def onError(t: Throwable) =
if (subscriber != null) subscriber.onError(t)
if (subscriber != null) tryOnError(subscriber, t)
else state.getAndSet(OnError(t)) match {
case sub: Subscriber[Any] sub.onError(t)
case _
case sub: Subscriber[Any] @unchecked tryOnError(sub, t)
case _
}
override def onComplete() =
if (subscriber != null) subscriber.onComplete()
if (subscriber != null) tryOnComplete(subscriber)
else state.getAndSet(OnComplete) match {
case sub: Subscriber[Any] sub.onComplete()
case _
case sub: Subscriber[Any] @unchecked tryOnComplete(sub)
case _
}
override def onNext(t: Any) =
if (subscriber != null) subscriber.onNext(t)
if (subscriber != null) tryOnNext(subscriber, t)
else throw new IllegalStateException("IdentityProcessor received onNext before signaling demand")
override def subscribe(sub: Subscriber[_ >: Any]) =
if (subscriber != null) sub.onError(new IllegalStateException("IdentityProcessor can only be subscribed to once"))
if (subscriber != null)
tryOnError(subscriber, new IllegalStateException("IdentityProcessor " + SupportsOnlyASingleSubscriber))
else {
subscriber = sub.asInstanceOf[Subscriber[Any]]
if (!state.compareAndSet(null, sub)) state.get match {
case OnSubscribe(s) sub.onSubscribe(s)
case OnError(t) sub.onError(t)
case OnComplete sub.onComplete()
case OnSubscribe(s) tryOnSubscribe(sub, s)
case OnError(t) tryOnError(sub, t)
case OnComplete tryOnComplete(sub)
case s throw new IllegalStateException(s"IdentityProcessor found unknown state $s")
}
}