diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index f0aa11b09a..b024a1dd4c 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -199,8 +199,9 @@ trait ActorPublisher[T] extends Actor { case Active | PreSubscriber ⇒ lifecycleState = Completed if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives - tryOnComplete(subscriber) - subscriber = null // not used after onComplete + try tryOnComplete(subscriber) finally { + subscriber = null // not used after onComplete + } case Completed ⇒ throw new IllegalStateException("onComplete must only be called once") case _: ErrorEmitted ⇒ @@ -216,8 +217,8 @@ trait ActorPublisher[T] extends Actor { case Active | PreSubscriber ⇒ lifecycleState = ErrorEmitted(cause) if (subscriber ne null) // otherwise onError will be called when the subscription arrives - tryOnError(subscriber, cause) - subscriber = null // not used after onError + try tryOnError(subscriber, cause) finally + subscriber = null // not used after onError case _: ErrorEmitted ⇒ throw new IllegalStateException("onError must only be called once") case Completed ⇒ @@ -249,7 +250,7 @@ trait ActorPublisher[T] extends Actor { scheduledSubscriptionTimeout.cancel() subscriber = sub lifecycleState = Active - sub.onSubscribe(new ActorPublisherSubscription(self)) + tryOnSubscribe(sub, new ActorPublisherSubscription(self)) case ErrorEmitted(cause) ⇒ tryOnError(sub, cause) case Completed ⇒ tryOnComplete(sub) case Active | Canceled ⇒ @@ -321,8 +322,8 @@ trait ActorPublisher[T] extends Actor { */ protected[akka] override def aroundPostStop(): Unit = { state.remove(self) - if (lifecycleState == Active) tryOnComplete(subscriber) - super.aroundPostStop() + try if (lifecycleState == Active) tryOnComplete(subscriber) + finally super.aroundPostStop() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 3d3d033947..d24865b52d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -4,12 +4,12 @@ package akka.stream.impl import java.util.Arrays - import akka.actor._ import akka.stream.ActorFlowMaterializerSettings import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError } import org.reactivestreams.{ Subscriber, Subscription, Processor } +import akka.event.Logging /** * INTERNAL API @@ -142,6 +142,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) * INTERNAL API */ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends DefaultOutputTransferStates { + import ReactiveStreamsCompliance._ protected var exposedPublisher: ActorPublisher[Any] = _ @@ -156,22 +157,22 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D def enqueueOutputElement(elem: Any): Unit = { downstreamDemand -= 1 - subscriber.onNext(elem) + tryOnNext(subscriber, elem) } def complete(): Unit = { if (!downstreamCompleted) { downstreamCompleted = true - if (subscriber ne null) subscriber.onComplete() if (exposedPublisher ne null) exposedPublisher.shutdown(None) + if (subscriber ne null) tryOnComplete(subscriber) } } def cancel(e: Throwable): Unit = { if (!downstreamCompleted) { downstreamCompleted = true - if (subscriber ne null) subscriber.onError(e) if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) + if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e) } } @@ -183,8 +184,9 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D subscribers foreach { sub ⇒ if (subscriber eq null) { subscriber = sub - subscriber.onSubscribe(createSubscription()) - } else sub.onError(new IllegalStateException(s"${getClass.getSimpleName} ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}")) + tryOnSubscribe(subscriber, createSubscription()) + } else + tryOnError(sub, new IllegalStateException(s"${Logging.simpleName(this)} ${SupportsOnlyASingleSubscriber}")) } protected def waitingExposedPublisher: Actor.Receive = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 99158eba32..f776ce4943 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -16,7 +16,7 @@ import org.reactivestreams.Subscription * INTERNAL API */ private[akka] object ActorPublisher { - class NormalShutdownException extends IllegalStateException("Cannot subscribe to shut-down spi.Publisher") with NoStackTrace + class NormalShutdownException extends IllegalStateException("Cannot subscribe to shut-down Publisher") with NoStackTrace val NormalShutdownReason: Option[Throwable] = Some(new NormalShutdownException) def apply[T](impl: ActorRef): ActorPublisher[T] = { @@ -35,6 +35,7 @@ private[akka] object ActorPublisher { * ActorRef! If you don't need to subclass, prefer the apply() method on the companion object which takes care of this. */ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { + import ReactiveStreamsCompliance._ // The subscriber of an subscription attempt is first placed in this list of pending subscribers. // The actor will call takePendingSubscribers to remove it from the list when it has received the @@ -78,9 +79,16 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { @volatile private var shutdownReason: Option[Throwable] = None private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit = - shutdownReason match { - case Some(e) ⇒ subscriber.onError(e) - case None ⇒ subscriber.onComplete() + try shutdownReason match { + case Some(e: SpecViolation) ⇒ // ok, not allowed to call onError + case Some(e) ⇒ + if (shutdownReason eq ActorPublisher.NormalShutdownReason) + (new RuntimeException("BOOM")).printStackTrace() + + tryOnError(subscriber, e) + case None ⇒ tryOnComplete(subscriber) + } catch { + case _: SpecViolation ⇒ // nothing to do } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index f8431217af..6bf5367ba4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -9,7 +9,11 @@ import org.reactivestreams.{ Subscriber, Publisher } * INTERNAL API */ private[akka] case object EmptyPublisher extends Publisher[Nothing] { - override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = subscriber.onComplete() + import ReactiveStreamsCompliance._ + override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = + try tryOnComplete(subscriber) catch { + case _: SpecViolation ⇒ // nothing to do + } def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] override def toString: String = "empty-publisher" // FIXME is this a good name? } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index c0db401758..b878fcac03 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -55,7 +55,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu override protected def shutdown(completed: Boolean): Unit = { if (exposedPublisher ne null) { if (completed) exposedPublisher.shutdown(None) - else exposedPublisher.shutdown(Some(new IllegalStateException("Cannot subscribe to shutdown publisher"))) + else exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) } afterShutdown() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index da37bbd9aa..633b0b0a5a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -17,6 +17,7 @@ import akka.pattern.pipe import org.reactivestreams.Subscriber import org.reactivestreams.Subscription import akka.actor.DeadLetterSuppression +import scala.util.control.NonFatal /** * INTERNAL API @@ -42,7 +43,7 @@ private[akka] object FuturePublisher { * INTERNAL API */ // FIXME why do we need to have an actor to drive a Future? -private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMaterializerSettings) extends Actor with SoftShutdown { +private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMaterializerSettings) extends Actor { import akka.stream.impl.FuturePublisher.FutureSubscription import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore @@ -102,17 +103,22 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate def pushToAll(): Unit = subscriptionsReadyForPush foreach { subscription ⇒ push(subscriptions(subscription)) } - def push(subscriber: Subscriber[Any]): Unit = futureValue match { - case Some(Success(value)) ⇒ - - tryOnNext(subscriber, value) - tryOnComplete(subscriber) - removeSubscriber(subscriber) - case Some(Failure(t)) ⇒ - tryOnError(subscriber, t) - removeSubscriber(subscriber) - case None ⇒ // not completed yet - } + def push(subscriber: Subscriber[Any]): Unit = + futureValue match { + case Some(someValue) ⇒ try someValue match { + case Success(value) ⇒ + tryOnNext(subscriber, value) + tryOnComplete(subscriber) + case Failure(t) ⇒ + shutdownReason = Some(t) + tryOnError(subscriber, t) + } catch { + case _: SpecViolation ⇒ // continue + } finally { + removeSubscriber(subscriber) + } + case None ⇒ // not completed yet + } def registerSubscriber(subscriber: Subscriber[Any]): Unit = { if (subscribers.contains(subscriber)) // FIXME this is not legal AFAICT, needs to check identity, not equality @@ -132,7 +138,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate subscribers -= subscriber if (subscribers.isEmpty) { exposedPublisher.shutdown(shutdownReason) - softShutdown() + context.stop(self) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala index f351a0e3dd..a2f68b2d3a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala @@ -129,11 +129,12 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: ActorFl case Unitialized | Initialized | Cancelled ⇒ if (exposedPublisher ne null) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) case Completed ⇒ - tryOnComplete(subscriber) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) + tryOnComplete(subscriber) case Errored(e) ⇒ - tryOnError(subscriber, e) exposedPublisher.shutdown(Some(e)) + if (!e.isInstanceOf[SpecViolation]) + tryOnError(subscriber, e) } // if onComplete or onError throws we let normal supervision take care of it, // see reactive-streams specification rule 2:13 diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index cd0ae089f5..354014905d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -40,6 +40,7 @@ private[akka] object MultiStreamOutputProcessor { class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: Cancellable) extends SimpleOutputs(actor, pump) with Publisher[Any] { + import ReactiveStreamsCompliance._ import SubstreamOutput._ @@ -80,8 +81,9 @@ private[akka] object MultiStreamOutputProcessor { } private def closeSubscriber(s: Subscriber[Any], withState: CompletedState): Unit = withState match { - case Completed ⇒ s.onComplete() - case Failed(e) ⇒ s.onError(e) + case Completed ⇒ tryOnComplete(s) + case Failed(e: SpecViolation) ⇒ // nothing to do + case Failed(e) ⇒ tryOnError(s, e) } override def subscribe(s: Subscriber[_ >: Any]): Unit = { @@ -89,9 +91,12 @@ private[akka] object MultiStreamOutputProcessor { if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) else { state.get() match { - case _: Attached ⇒ s.onError(new IllegalStateException("Substream publisher " + ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber)) - case c: CompletedState ⇒ closeSubscriber(s, c) - case Open ⇒ throw new IllegalStateException("Publisher cannot become open after being used before") + case _: Attached ⇒ + tryOnError(s, new IllegalStateException("Substream publisher " + SupportsOnlyASingleSubscriber)) + case c: CompletedState ⇒ + closeSubscriber(s, c) + case Open ⇒ + throw new IllegalStateException("Publisher cannot become open after being used before") } } } @@ -99,8 +104,9 @@ private[akka] object MultiStreamOutputProcessor { def attachSubscriber(s: Subscriber[Any]): Unit = if (subscriber eq null) { subscriber = s - subscriber.onSubscribe(subscription) - } else subscriber.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher")) + tryOnSubscribe(subscriber, subscription) + } else + tryOnError(subscriber, new IllegalStateException("Substream publisher " + SupportsOnlyASingleSubscriber)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index 6285dc2153..d6acf34cf9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -23,11 +23,13 @@ private[akka] object SubscriberManagement { } object Completed extends EndOfStream { - def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onComplete() + import ReactiveStreamsCompliance._ + def apply[T](subscriber: Subscriber[T]): Unit = tryOnComplete(subscriber) } final case class ErrorCompleted(cause: Throwable) extends EndOfStream { - def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onError(cause) + import ReactiveStreamsCompliance._ + def apply[T](subscriber: Subscriber[T]): Unit = tryOnError(subscriber, cause) } val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down Publisher")) @@ -37,9 +39,11 @@ private[akka] object SubscriberManagement { * INTERNAL API */ private[akka] trait SubscriptionWithCursor[T] extends Subscription with ResizableMultiReaderRingBuffer.Cursor { + import ReactiveStreamsCompliance._ + def subscriber: Subscriber[_ >: T] - def dispatch(element: T): Unit = subscriber.onNext(element) + def dispatch(element: T): Unit = tryOnNext(subscriber, element) var active = true diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 15e8518395..a318c3e318 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -43,7 +43,7 @@ private[akka] object TickPublisher { * otherwise the tick element is dropped. */ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any, - settings: ActorFlowMaterializerSettings, cancelled: AtomicBoolean) extends Actor with SoftShutdown { + settings: ActorFlowMaterializerSettings, cancelled: AtomicBoolean) extends Actor { import akka.stream.impl.TickPublisher.TickPublisherSubscription._ import akka.stream.impl.TickPublisher._ import ReactiveStreamsCompliance._ @@ -122,9 +122,10 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite override def postStop(): Unit = { tickTask.foreach(_.cancel) cancelled.set(true) - if (subscriber ne null) tryOnComplete(subscriber) if (exposedPublisher ne null) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) + if (subscriber ne null) + tryOnComplete(subscriber) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 2bd44f2eea..1ab3cdf0df 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -149,6 +149,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int) */ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boolean, log: LoggingAdapter) extends BoundaryStage { + import ReactiveStreamsCompliance._ private var exposedPublisher: ActorPublisher[Any] = _ @@ -163,14 +164,14 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole private def onNext(elem: Any): Unit = { downstreamDemand -= 1 - subscriber.onNext(elem) + tryOnNext(subscriber, elem) } private def complete(): Unit = { if (!downstreamCompleted) { downstreamCompleted = true - if (subscriber ne null) subscriber.onComplete() if (exposedPublisher ne null) exposedPublisher.shutdown(None) + if (subscriber ne null) tryOnComplete(subscriber) } } @@ -179,8 +180,8 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole downstreamCompleted = true if (debugLogging) log.debug("fail due to: {}", e.getMessage) - if (subscriber ne null) subscriber.onError(e) if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) + if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e) } } @@ -211,8 +212,9 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole subscribers foreach { sub ⇒ if (subscriber eq null) { subscriber = sub - subscriber.onSubscribe(new ActorSubscription(actor, subscriber)) - } else sub.onError(new IllegalStateException(s"${Logging.simpleName(this)} ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}")) + tryOnSubscribe(subscriber, new ActorSubscription(actor, subscriber)) + } else + tryOnError(sub, new IllegalStateException(s"${Logging.simpleName(this)} ${SupportsOnlyASingleSubscriber}")) } protected def waitingExposedPublisher: Actor.Receive = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala index 8e4d3518f1..f5be974f28 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala @@ -10,16 +10,18 @@ import org.reactivestreams.Subscriber import scala.concurrent.Future import scala.util.Failure import scala.util.Success +import akka.stream.impl.ReactiveStreamsCompliance /** * INTERNAL API */ private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[I, O]])(implicit ec: ExecutionContext) extends Processor[I, O] { + import ReactiveStreamsCompliance._ @volatile private var impl: Processor[I, O] = _ private val setVarFuture = implFuture.andThen { case Success(p) ⇒ impl = p } override def onSubscribe(s: Subscription): Unit = setVarFuture.onComplete { - case Success(x) ⇒ x.onSubscribe(s) + case Success(x) ⇒ tryOnSubscribe(x, s) case Failure(_) ⇒ s.cancel() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 295a4d2b89..824497b485 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -40,6 +40,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket flowSubscriber: Subscriber[StreamTcp.IncomingConnection], bindCmd: Tcp.Bind, settings: ActorFlowMaterializerSettings) extends Actor with Pump with ActorLogging { + import ReactiveStreamsCompliance._ import context.system object primaryOutputs extends SimpleOutputs(self, pump = this) { @@ -89,8 +90,8 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket val ex = BindFailedException localAddressPromise.failure(ex) unbindPromise.failure(ex) - flowSubscriber.onError(ex) - fail(ex) + try tryOnError(flowSubscriber, ex) + finally fail(ex) } def running: Receive = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index fa1f2c8191..ed8e058fd2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -152,6 +152,7 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS } final case class LazyEmptySource[Out]() extends KeyedActorFlowSource[Out, Promise[Unit]] { + import ReactiveStreamsCompliance._ override def attach(flowSubscriber: Subscriber[Out], materializer: ActorFlowMaterializer, flowName: String) = { val created = create(materializer, flowName) created._1.subscribe(flowSubscriber) @@ -166,14 +167,14 @@ final case class LazyEmptySource[Out]() extends KeyedActorFlowSource[Out, Promis // so we can enable it then, though it will require external completing of the promise val pub = new Publisher[Unit] { override def subscribe(s: Subscriber[_ >: Unit]) = { - s.onSubscribe(new Subscription { + tryOnSubscribe(s, new Subscription { override def request(n: Long): Unit = () override def cancel(): Unit = p.success(()) }) p.future.onComplete { - case Success(_) ⇒ s.onComplete() - case Failure(ex) ⇒ s.onError(ex) // due to external signal + case Success(_) ⇒ tryOnComplete(s) + case Failure(ex) ⇒ tryOnError(s, ex) // due to external signal }(materializer.asInstanceOf[ActorFlowMaterializerImpl].executionContext) // TODO: Should it use this EC or something else? } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index 3e38421378..f68327bdb9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -4,7 +4,6 @@ package akka.stream.scaladsl import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference } - import akka.stream.FlowMaterializer import akka.stream.impl.Ast import akka.stream.impl.Ast.FanInAstNode @@ -12,8 +11,8 @@ import akka.stream.impl.{ DirectedGraphBuilder, Edge } import akka.stream.impl.Ast.Defaults._ import akka.stream.scaladsl.OperationAttributes._ import org.reactivestreams._ - import scala.language.existentials +import akka.stream.impl.ReactiveStreamsCompliance /** * Fan-in and fan-out vertices in the [[FlowGraph]] implements @@ -528,43 +527,45 @@ private[akka] object FlowGraphInternal { final class IdentityProcessor extends Processor[Any, Any] { import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage._ + import ReactiveStreamsCompliance._ @volatile private var subscriber: Subscriber[Any] = null private val state = new AtomicReference[AnyRef]() override def onSubscribe(s: Subscription) = - if (subscriber != null) subscriber.onSubscribe(s) + if (subscriber != null) tryOnSubscribe(subscriber, s) else state.getAndSet(OnSubscribe(s)) match { - case sub: Subscriber[Any] ⇒ sub.onSubscribe(s) - case _ ⇒ + case sub: Subscriber[Any] @unchecked ⇒ tryOnSubscribe(sub, s) + case _ ⇒ } override def onError(t: Throwable) = - if (subscriber != null) subscriber.onError(t) + if (subscriber != null) tryOnError(subscriber, t) else state.getAndSet(OnError(t)) match { - case sub: Subscriber[Any] ⇒ sub.onError(t) - case _ ⇒ + case sub: Subscriber[Any] @unchecked ⇒ tryOnError(sub, t) + case _ ⇒ } override def onComplete() = - if (subscriber != null) subscriber.onComplete() + if (subscriber != null) tryOnComplete(subscriber) else state.getAndSet(OnComplete) match { - case sub: Subscriber[Any] ⇒ sub.onComplete() - case _ ⇒ + case sub: Subscriber[Any] @unchecked ⇒ tryOnComplete(sub) + case _ ⇒ } override def onNext(t: Any) = - if (subscriber != null) subscriber.onNext(t) + if (subscriber != null) tryOnNext(subscriber, t) else throw new IllegalStateException("IdentityProcessor received onNext before signaling demand") override def subscribe(sub: Subscriber[_ >: Any]) = - if (subscriber != null) sub.onError(new IllegalStateException("IdentityProcessor can only be subscribed to once")) + if (subscriber != null) + tryOnError(subscriber, new IllegalStateException("IdentityProcessor " + SupportsOnlyASingleSubscriber)) else { subscriber = sub.asInstanceOf[Subscriber[Any]] if (!state.compareAndSet(null, sub)) state.get match { - case OnSubscribe(s) ⇒ sub.onSubscribe(s) - case OnError(t) ⇒ sub.onError(t) - case OnComplete ⇒ sub.onComplete() + case OnSubscribe(s) ⇒ tryOnSubscribe(sub, s) + case OnError(t) ⇒ tryOnError(sub, t) + case OnComplete ⇒ tryOnComplete(sub) case s ⇒ throw new IllegalStateException(s"IdentityProcessor found unknown state $s") } }