!str Improve ActorPublisher

* proper order of processing subscriptions
* exception passing on error for new subscribers
This commit is contained in:
Patrik Nordwall 2014-03-31 14:15:14 +02:00 committed by Roland Kuhn
parent 0608db4b0d
commit 3c14e29bef
6 changed files with 94 additions and 38 deletions

View file

@ -14,21 +14,33 @@ import Ast.{ AstNode, Recover, Transform }
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, actorRef2Scala }
import akka.stream.GeneratorSettings
class ActorSubscriber[T]( final val impl: ActorRef) extends Subscriber[T] {
/**
* INTERNAL API
*/
private[akka] class ActorSubscriber[T]( final val impl: ActorRef) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! OnError(cause)
override def onComplete(): Unit = impl ! OnComplete
override def onNext(element: T): Unit = impl ! OnNext(element)
override def onSubscribe(subscription: Subscription): Unit = impl ! OnSubscribe(subscription)
}
trait ActorConsumerLike[T] extends Consumer[T] {
/**
* INTERNAL API
*/
private[akka] trait ActorConsumerLike[T] extends Consumer[T] {
def impl: ActorRef
override val getSubscriber: Subscriber[T] = new ActorSubscriber[T](impl)
}
class ActorConsumer[T]( final val impl: ActorRef) extends ActorConsumerLike[T]
/**
* INTERNAL API
*/
private[akka] class ActorConsumer[T]( final val impl: ActorRef) extends ActorConsumerLike[T]
object ActorConsumer {
/**
* INTERNAL API
*/
private[akka] object ActorConsumer {
import Ast._
def props(gen: GeneratorSettings, op: AstNode) = op match {
@ -37,7 +49,10 @@ object ActorConsumer {
}
}
abstract class AbstractActorConsumer(val settings: GeneratorSettings) extends Actor {
/**
* INTERNAL API
*/
private[akka] abstract class AbstractActorConsumer(val settings: GeneratorSettings) extends Actor {
import ActorProcessor._
/**
@ -102,7 +117,10 @@ abstract class AbstractActorConsumer(val settings: GeneratorSettings) extends Ac
}
}
class TransformActorConsumer(_settings: GeneratorSettings, op: Ast.Transform) extends AbstractActorConsumer(_settings) with ActorLogging {
/**
* INTERNAL API
*/
private[akka] class TransformActorConsumer(_settings: GeneratorSettings, op: Ast.Transform) extends AbstractActorConsumer(_settings) with ActorLogging {
private var state = op.zero
private var onCompleteCalled = false
@ -133,7 +151,10 @@ class TransformActorConsumer(_settings: GeneratorSettings, op: Ast.Transform) ex
}
}
class RecoverActorConsumer(_settings: GeneratorSettings, op: Ast.Recover) extends TransformActorConsumer(_settings, op.t) {
/**
* INTERNAL API
*/
private[akka] class RecoverActorConsumer(_settings: GeneratorSettings, op: Ast.Recover) extends TransformActorConsumer(_settings, op.t) {
override def onNext(elem: Any): Unit = {
super.onNext(Success(elem))
}

View file

@ -104,6 +104,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
def failureReceived(e: Throwable): Unit = fail(e)
def fail(e: Throwable): Unit = {
shutdownReason = Some(e)
log.error(e, "failure during processing") // FIXME: escalate to supervisor instead
abortDownstream(e)
if (upstream ne null) upstream.cancel()
@ -218,18 +219,19 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
////////////////////// Shutdown and cleanup (graceful and abort) //////////////////////
var isShuttingDown = false
var completed = false
var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason
// Called by SubscriberManagement to signal that output buffer finished (flushed or aborted)
override def shutdown(completed: Boolean): Unit = {
isShuttingDown = true
this.completed = completed
if (completed)
shutdownReason = None
context.stop(self)
}
override def postStop(): Unit = {
if (exposedPublisher ne null)
exposedPublisher.shutdown(completed)
exposedPublisher.shutdown(shutdownReason)
// Non-gracefully stopped, do our best here
if (!isShuttingDown)
abortDownstream(new IllegalStateException("Processor actor terminated abruptly"))

View file

@ -5,6 +5,7 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
import org.reactivestreams.api.{ Consumer, Producer }
import org.reactivestreams.spi.{ Publisher, Subscriber }
import akka.actor.ActorRef
@ -17,7 +18,10 @@ import akka.actor.Props
import scala.util.control.NoStackTrace
import akka.stream.Stream
trait ActorProducerLike[T] extends Producer[T] {
/**
* INTERNAL API
*/
private[akka] trait ActorProducerLike[T] extends Producer[T] {
def impl: ActorRef
override val getPublisher: Publisher[T] = {
val a = new ActorPublisher[T](impl)
@ -32,27 +36,41 @@ trait ActorProducerLike[T] extends Producer[T] {
class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T]
object ActorProducer {
/**
* INTERNAL API
*/
private[akka] object ActorProducer {
def props[T](settings: GeneratorSettings, f: () T): Props =
Props(new ActorProducerImpl(f, settings))
}
final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
/**
* INTERNAL API
*/
private[akka] object ActorPublisher {
class NormalShutdownException extends IllegalStateException("Cannot subscribe to shut-down spi.Publisher") with NoStackTrace
val NormalShutdownReason: Option[Throwable] = Some(new NormalShutdownException)
}
/**
* INTERNAL API
*/
private[akka] final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
// The subscriber of an subscription attempt is first placed in this list of pending subscribers.
// The actor will call takePendingSubscribers to remove it from the list when it has received the
// SubscribePending message. The AtomicReference is set to null by the shutdown method, which is
// called by the actor from postStop. Pending (unregistered) subscription attempts are denied by
// the shutdown method. Subscription attempts after shutdown can be denied immediately.
private val pendingSubscribers = new AtomicReference[List[Subscriber[T]]](Nil)
private val pendingSubscribers = new AtomicReference[immutable.Seq[Subscriber[T]]](Nil)
override def subscribe(subscriber: Subscriber[T]): Unit = {
@tailrec def doSubscribe(subscriber: Subscriber[T]): Unit = {
val current = pendingSubscribers.get
if (current eq null)
reportShutdownError(subscriber)
reportSubscribeError(subscriber)
else {
if (pendingSubscribers.compareAndSet(current, subscriber :: current))
if (pendingSubscribers.compareAndSet(current, subscriber +: current))
impl ! SubscribePending
else
doSubscribe(subscriber) // CAS retry
@ -62,40 +80,55 @@ final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
doSubscribe(subscriber)
}
def takePendingSubscribers(): List[Subscriber[T]] =
pendingSubscribers.getAndSet(Nil)
def shutdown(completed: Boolean): Unit = {
this.completed = completed
pendingSubscribers.getAndSet(null) foreach reportShutdownError
def takePendingSubscribers(): immutable.Seq[Subscriber[T]] = {
val pending = pendingSubscribers.getAndSet(Nil)
assert(pending ne null, "takePendingSubscribers must not be called after shutdown")
pending.reverse
}
@volatile private var completed: Boolean = false
def shutdown(reason: Option[Throwable]): Unit = {
shutdownReason = reason
val pending = pendingSubscribers.getAndSet(null)
assert(pending ne null, "shutdown must only be called once, from postStop")
pending foreach reportSubscribeError
}
private def reportShutdownError(subscriber: Subscriber[T]): Unit =
if (completed) subscriber.onComplete()
else subscriber.onError(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher"))
@volatile private var shutdownReason: Option[Throwable] = None
private def reportSubscribeError(subscriber: Subscriber[T]): Unit =
shutdownReason match {
case Some(e) subscriber.onError(e)
case None subscriber.onComplete()
}
}
class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[T]) extends SubscriptionWithCursor[T] {
/**
* INTERNAL API
*/
private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[T]) extends SubscriptionWithCursor[T] {
override def requestMore(elements: Int): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0")
else impl ! RequestMore(this, elements)
override def cancel(): Unit = impl ! Cancel(this)
}
object ActorProducerImpl {
/**
* INTERNAL API
*/
private[akka] object ActorProducerImpl {
case object Generate
}
class ActorProducerImpl[T](f: () T, settings: GeneratorSettings) extends Actor with ActorLogging with SubscriberManagement[T] {
/**
* INTERNAL API
*/
private[akka] class ActorProducerImpl[T](f: () T, settings: GeneratorSettings) extends Actor with ActorLogging with SubscriberManagement[T] {
import Stream._
import ActorProducerImpl._
type S = ActorSubscription[T]
var pub: ActorPublisher[T] = _
var completed = false
var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason
context.setReceiveTimeout(settings.downstreamSubscriptionTimeout)
@ -126,7 +159,7 @@ class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Act
}
override def postStop(): Unit = {
pub.shutdown(completed)
pub.shutdown(shutdownReason)
}
private var demand = 0
@ -136,8 +169,8 @@ class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Act
pushToDownstream(f())
true
} catch {
case Stop { completeDownstream(); completed = true; false }
case NonFatal(e) { abortDownstream(e); false }
case Stop { completeDownstream(); shutdownReason = None; false }
case NonFatal(e) { abortDownstream(e); shutdownReason = Some(e); false }
}
demand -= 1
if (continue && demand > 0) self ! Generate

View file

@ -55,7 +55,6 @@ private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings
var exposedPublisher: ActorPublisher[Any] = _
var subscribers = Set.empty[Subscriber[Any]]
var workers = Map.empty[ActorRef, Subscriber[Any]]
var completed = false
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
@ -102,7 +101,7 @@ private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings
override def postStop(): Unit = {
if (exposedPublisher ne null)
exposedPublisher.shutdown(completed)
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
}
}

View file

@ -5,6 +5,8 @@ package akka.stream.impl
import org.reactivestreams.spi.Subscription
// FIXME INTERNAL API
case class OnSubscribe(subscription: Subscription)
// TODO performance improvement: skip wrapping ordinary elements in OnNext
case class OnNext(element: Any)

View file

@ -353,8 +353,7 @@ class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.re
val downstream2 = StreamTestKit.consumerProbe[String]()
producer.produceTo(downstream2)
// IllegalStateException shut down
downstream2.expectError().getClass should be(classOf[IllegalStateException])
downstream2.expectError() should be(TestException)
}
}
@ -367,7 +366,7 @@ class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.re
val downstream2 = StreamTestKit.consumerProbe[Any]()
producer.produceTo(downstream2)
// IllegalStateException shut down
downstream2.expectError().getClass should be(classOf[IllegalStateException])
downstream2.expectError().isInstanceOf[IllegalStateException] should be(true)
}
}