From 3c14e29befeb504a6e96fe171d3e7986143fca8a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 31 Mar 2014 14:15:14 +0200 Subject: [PATCH] !str Improve ActorPublisher * proper order of processing subscriptions * exception passing on error for new subscribers --- .../akka/stream/impl/ActorConsumer.scala | 35 ++++++-- .../akka/stream/impl/ActorProcessor.scala | 8 +- .../akka/stream/impl/ActorProducer.scala | 79 +++++++++++++------ .../akka/stream/impl/IterableProducer.scala | 3 +- .../scala/akka/stream/impl/Messages.scala | 2 + .../test/scala/akka/stream/StreamSpec.scala | 5 +- 6 files changed, 94 insertions(+), 38 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala index 29f06b73c0..4d14f08b17 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala @@ -14,21 +14,33 @@ import Ast.{ AstNode, Recover, Transform } import akka.actor.{ Actor, ActorLogging, ActorRef, Props, actorRef2Scala } import akka.stream.GeneratorSettings -class ActorSubscriber[T]( final val impl: ActorRef) extends Subscriber[T] { +/** + * INTERNAL API + */ +private[akka] class ActorSubscriber[T]( final val impl: ActorRef) extends Subscriber[T] { override def onError(cause: Throwable): Unit = impl ! OnError(cause) override def onComplete(): Unit = impl ! OnComplete override def onNext(element: T): Unit = impl ! OnNext(element) override def onSubscribe(subscription: Subscription): Unit = impl ! OnSubscribe(subscription) } -trait ActorConsumerLike[T] extends Consumer[T] { +/** + * INTERNAL API + */ +private[akka] trait ActorConsumerLike[T] extends Consumer[T] { def impl: ActorRef override val getSubscriber: Subscriber[T] = new ActorSubscriber[T](impl) } -class ActorConsumer[T]( final val impl: ActorRef) extends ActorConsumerLike[T] +/** + * INTERNAL API + */ +private[akka] class ActorConsumer[T]( final val impl: ActorRef) extends ActorConsumerLike[T] -object ActorConsumer { +/** + * INTERNAL API + */ +private[akka] object ActorConsumer { import Ast._ def props(gen: GeneratorSettings, op: AstNode) = op match { @@ -37,7 +49,10 @@ object ActorConsumer { } } -abstract class AbstractActorConsumer(val settings: GeneratorSettings) extends Actor { +/** + * INTERNAL API + */ +private[akka] abstract class AbstractActorConsumer(val settings: GeneratorSettings) extends Actor { import ActorProcessor._ /** @@ -102,7 +117,10 @@ abstract class AbstractActorConsumer(val settings: GeneratorSettings) extends Ac } } -class TransformActorConsumer(_settings: GeneratorSettings, op: Ast.Transform) extends AbstractActorConsumer(_settings) with ActorLogging { +/** + * INTERNAL API + */ +private[akka] class TransformActorConsumer(_settings: GeneratorSettings, op: Ast.Transform) extends AbstractActorConsumer(_settings) with ActorLogging { private var state = op.zero private var onCompleteCalled = false @@ -133,7 +151,10 @@ class TransformActorConsumer(_settings: GeneratorSettings, op: Ast.Transform) ex } } -class RecoverActorConsumer(_settings: GeneratorSettings, op: Ast.Recover) extends TransformActorConsumer(_settings, op.t) { +/** + * INTERNAL API + */ +private[akka] class RecoverActorConsumer(_settings: GeneratorSettings, op: Ast.Recover) extends TransformActorConsumer(_settings, op.t) { override def onNext(elem: Any): Unit = { super.onNext(Success(elem)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 1a3c605f36..8c058aedd8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -104,6 +104,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) def failureReceived(e: Throwable): Unit = fail(e) def fail(e: Throwable): Unit = { + shutdownReason = Some(e) log.error(e, "failure during processing") // FIXME: escalate to supervisor instead abortDownstream(e) if (upstream ne null) upstream.cancel() @@ -218,18 +219,19 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) ////////////////////// Shutdown and cleanup (graceful and abort) ////////////////////// var isShuttingDown = false - var completed = false + var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason // Called by SubscriberManagement to signal that output buffer finished (flushed or aborted) override def shutdown(completed: Boolean): Unit = { isShuttingDown = true - this.completed = completed + if (completed) + shutdownReason = None context.stop(self) } override def postStop(): Unit = { if (exposedPublisher ne null) - exposedPublisher.shutdown(completed) + exposedPublisher.shutdown(shutdownReason) // Non-gracefully stopped, do our best here if (!isShuttingDown) abortDownstream(new IllegalStateException("Processor actor terminated abruptly")) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala index bf15a2bf21..9c78437e7d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala @@ -5,6 +5,7 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec +import scala.collection.immutable import org.reactivestreams.api.{ Consumer, Producer } import org.reactivestreams.spi.{ Publisher, Subscriber } import akka.actor.ActorRef @@ -17,7 +18,10 @@ import akka.actor.Props import scala.util.control.NoStackTrace import akka.stream.Stream -trait ActorProducerLike[T] extends Producer[T] { +/** + * INTERNAL API + */ +private[akka] trait ActorProducerLike[T] extends Producer[T] { def impl: ActorRef override val getPublisher: Publisher[T] = { val a = new ActorPublisher[T](impl) @@ -32,27 +36,41 @@ trait ActorProducerLike[T] extends Producer[T] { class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T] -object ActorProducer { +/** + * INTERNAL API + */ +private[akka] object ActorProducer { def props[T](settings: GeneratorSettings, f: () ⇒ T): Props = Props(new ActorProducerImpl(f, settings)) } -final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { +/** + * INTERNAL API + */ +private[akka] object ActorPublisher { + class NormalShutdownException extends IllegalStateException("Cannot subscribe to shut-down spi.Publisher") with NoStackTrace + val NormalShutdownReason: Option[Throwable] = Some(new NormalShutdownException) +} + +/** + * INTERNAL API + */ +private[akka] final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { // The subscriber of an subscription attempt is first placed in this list of pending subscribers. // The actor will call takePendingSubscribers to remove it from the list when it has received the // SubscribePending message. The AtomicReference is set to null by the shutdown method, which is // called by the actor from postStop. Pending (unregistered) subscription attempts are denied by // the shutdown method. Subscription attempts after shutdown can be denied immediately. - private val pendingSubscribers = new AtomicReference[List[Subscriber[T]]](Nil) + private val pendingSubscribers = new AtomicReference[immutable.Seq[Subscriber[T]]](Nil) override def subscribe(subscriber: Subscriber[T]): Unit = { @tailrec def doSubscribe(subscriber: Subscriber[T]): Unit = { val current = pendingSubscribers.get if (current eq null) - reportShutdownError(subscriber) + reportSubscribeError(subscriber) else { - if (pendingSubscribers.compareAndSet(current, subscriber :: current)) + if (pendingSubscribers.compareAndSet(current, subscriber +: current)) impl ! SubscribePending else doSubscribe(subscriber) // CAS retry @@ -62,40 +80,55 @@ final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { doSubscribe(subscriber) } - def takePendingSubscribers(): List[Subscriber[T]] = - pendingSubscribers.getAndSet(Nil) - - def shutdown(completed: Boolean): Unit = { - this.completed = completed - pendingSubscribers.getAndSet(null) foreach reportShutdownError + def takePendingSubscribers(): immutable.Seq[Subscriber[T]] = { + val pending = pendingSubscribers.getAndSet(Nil) + assert(pending ne null, "takePendingSubscribers must not be called after shutdown") + pending.reverse } - @volatile private var completed: Boolean = false + def shutdown(reason: Option[Throwable]): Unit = { + shutdownReason = reason + val pending = pendingSubscribers.getAndSet(null) + assert(pending ne null, "shutdown must only be called once, from postStop") + pending foreach reportSubscribeError + } - private def reportShutdownError(subscriber: Subscriber[T]): Unit = - if (completed) subscriber.onComplete() - else subscriber.onError(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher")) + @volatile private var shutdownReason: Option[Throwable] = None + private def reportSubscribeError(subscriber: Subscriber[T]): Unit = + shutdownReason match { + case Some(e) ⇒ subscriber.onError(e) + case None ⇒ subscriber.onComplete() + } } -class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[T]) extends SubscriptionWithCursor[T] { +/** + * INTERNAL API + */ +private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[T]) extends SubscriptionWithCursor[T] { override def requestMore(elements: Int): Unit = if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") else impl ! RequestMore(this, elements) override def cancel(): Unit = impl ! Cancel(this) } -object ActorProducerImpl { +/** + * INTERNAL API + */ +private[akka] object ActorProducerImpl { case object Generate } -class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Actor with ActorLogging with SubscriberManagement[T] { +/** + * INTERNAL API + */ +private[akka] class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Actor with ActorLogging with SubscriberManagement[T] { import Stream._ import ActorProducerImpl._ type S = ActorSubscription[T] var pub: ActorPublisher[T] = _ - var completed = false + var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason context.setReceiveTimeout(settings.downstreamSubscriptionTimeout) @@ -126,7 +159,7 @@ class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Act } override def postStop(): Unit = { - pub.shutdown(completed) + pub.shutdown(shutdownReason) } private var demand = 0 @@ -136,8 +169,8 @@ class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Act pushToDownstream(f()) true } catch { - case Stop ⇒ { completeDownstream(); completed = true; false } - case NonFatal(e) ⇒ { abortDownstream(e); false } + case Stop ⇒ { completeDownstream(); shutdownReason = None; false } + case NonFatal(e) ⇒ { abortDownstream(e); shutdownReason = Some(e); false } } demand -= 1 if (continue && demand > 0) self ! Generate diff --git a/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala index b5027304f0..2e8ff8615d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala @@ -55,7 +55,6 @@ private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings var exposedPublisher: ActorPublisher[Any] = _ var subscribers = Set.empty[Subscriber[Any]] var workers = Map.empty[ActorRef, Subscriber[Any]] - var completed = false override val supervisorStrategy = SupervisorStrategy.stoppingStrategy @@ -102,7 +101,7 @@ private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings override def postStop(): Unit = { if (exposedPublisher ne null) - exposedPublisher.shutdown(completed) + exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala index ffe9600bc4..e03b3a5278 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala @@ -5,6 +5,8 @@ package akka.stream.impl import org.reactivestreams.spi.Subscription +// FIXME INTERNAL API + case class OnSubscribe(subscription: Subscription) // TODO performance improvement: skip wrapping ordinary elements in OnNext case class OnNext(element: Any) diff --git a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamSpec.scala index 39c1a1ae80..c8ae018ad1 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamSpec.scala @@ -353,8 +353,7 @@ class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.re val downstream2 = StreamTestKit.consumerProbe[String]() producer.produceTo(downstream2) - // IllegalStateException shut down - downstream2.expectError().getClass should be(classOf[IllegalStateException]) + downstream2.expectError() should be(TestException) } } @@ -367,7 +366,7 @@ class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.re val downstream2 = StreamTestKit.consumerProbe[Any]() producer.produceTo(downstream2) // IllegalStateException shut down - downstream2.expectError().getClass should be(classOf[IllegalStateException]) + downstream2.expectError().isInstanceOf[IllegalStateException] should be(true) } }